1use std::{
9 collections::{HashMap, VecDeque},
10 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
11 time::Duration,
12};
13
14use crate::shared::ConnectionId;
15use tracing::{debug, info, trace, warn};
16
17use crate::{Instant, VarInt};
18
19#[derive(Debug)]
24pub(super) struct NatTraversalState {
25 pub(super) role: NatTraversalRole,
27 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
29 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
31 pub(super) candidate_pairs: Vec<CandidatePair>,
33 pub(super) pair_index: HashMap<SocketAddr, usize>,
35 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
37 pub(super) coordination: Option<CoordinationState>,
39 pub(super) next_sequence: VarInt,
41 pub(super) max_candidates: u32,
43 pub(super) coordination_timeout: Duration,
45 pub(super) stats: NatTraversalStats,
47 pub(super) security_state: SecurityValidationState,
49 pub(super) network_monitor: NetworkConditionMonitor,
51 pub(super) resource_manager: ResourceCleanupCoordinator,
53 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
55}
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum NatTraversalRole {
59 Client,
61 Server {
63 can_relay: bool,
65 },
66 Bootstrap,
68}
69#[derive(Debug, Clone)]
71pub(super) struct AddressCandidate {
72 pub(super) address: SocketAddr,
74 pub(super) priority: u32,
76 pub(super) source: CandidateSource,
78 pub(super) discovered_at: Instant,
80 pub(super) state: CandidateState,
82 pub(super) attempt_count: u32,
84 pub(super) last_attempt: Option<Instant>,
86}
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum CandidateSource {
90 Local,
92 Observed {
97 by_node: Option<VarInt>,
99 },
100 Peer,
102 Predicted,
104}
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum CandidateState {
108 New,
110 Validating,
112 Valid,
114 Failed,
116 Removed,
118}
119#[derive(Debug)]
121#[allow(dead_code)]
122pub(super) struct PathValidationState {
123 pub(super) challenge: u64,
125 pub(super) sent_at: Instant,
127 pub(super) retry_count: u32,
129 pub(super) max_retries: u32,
131 pub(super) coordination_round: Option<VarInt>,
133 pub(super) timeout_state: AdaptiveTimeoutState,
135 pub(super) last_retry_at: Option<Instant>,
137}
138#[derive(Debug)]
140#[allow(dead_code)]
141pub(super) struct CoordinationState {
142 pub(super) round: VarInt,
144 pub(super) punch_targets: Vec<PunchTarget>,
146 pub(super) round_start: Instant,
148 pub(super) punch_start: Instant,
150 pub(super) round_duration: Duration,
152 pub(super) state: CoordinationPhase,
154 pub(super) punch_request_sent: bool,
156 pub(super) peer_punch_received: bool,
158 pub(super) retry_count: u32,
160 pub(super) max_retries: u32,
162 pub(super) timeout_state: AdaptiveTimeoutState,
164 pub(super) last_retry_at: Option<Instant>,
166}
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169#[allow(dead_code)]
170pub(crate) enum CoordinationPhase {
171 Idle,
173 Requesting,
175 Coordinating,
177 Preparing,
179 Punching,
181 Validating,
183 Succeeded,
185 Failed,
187}
188#[derive(Debug, Clone)]
190pub(super) struct PunchTarget {
191 pub(super) remote_addr: SocketAddr,
193 pub(super) remote_sequence: VarInt,
195 pub(super) challenge: u64,
197}
198#[derive(Debug, Clone, PartialEq, Eq)]
200pub(super) enum TimeoutAction {
201 RetryDiscovery,
203 RetryCoordination,
205 StartValidation,
207 Complete,
209 Failed,
211}
212
213#[derive(Debug, Clone)]
215#[allow(dead_code)]
216pub(super) struct CandidatePair {
217 pub(super) remote_sequence: VarInt,
219 pub(super) local_addr: SocketAddr,
221 pub(super) remote_addr: SocketAddr,
223 pub(super) priority: u64,
225 pub(super) state: PairState,
227 pub(super) pair_type: PairType,
229 pub(super) created_at: Instant,
231 pub(super) last_check: Option<Instant>,
233}
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236#[allow(dead_code)]
237pub(super) enum PairState {
238 Waiting,
240 Succeeded,
242 Failed,
244 Frozen,
246}
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
249pub(super) enum PairType {
250 HostToHost,
252 HostToServerReflexive,
254 ServerReflexiveToHost,
256 ServerReflexiveToServerReflexive,
258 PeerReflexive,
260}
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub(super) enum CandidateType {
264 Host,
266 ServerReflexive,
268 PeerReflexive,
270}
271
272#[allow(dead_code)]
275fn calculate_candidate_priority(
276 candidate_type: CandidateType,
277 local_preference: u16,
278 component_id: u8,
279) -> u32 {
280 let type_preference = match candidate_type {
281 CandidateType::Host => 126,
282 CandidateType::PeerReflexive => 110,
283 CandidateType::ServerReflexive => 100,
284 };
285 (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
287}
288
289fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
292 let g = local_priority as u64;
293 let d = remote_priority as u64;
294 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
296}
297
298fn classify_candidate_type(source: CandidateSource) -> CandidateType {
300 match source {
301 CandidateSource::Local => CandidateType::Host,
302 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
303 CandidateSource::Peer => CandidateType::PeerReflexive,
304 CandidateSource::Predicted => CandidateType::ServerReflexive, }
306}
307fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
309 match (local_type, remote_type) {
310 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
311 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
312 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
313 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
314 PairType::ServerReflexiveToServerReflexive
315 }
316 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
317 PairType::PeerReflexive
318 }
319 }
320}
321fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
323 match (local.address, remote.address) {
325 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
326 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
327 _ => false, }
329}
330#[derive(Debug, Default, Clone)]
332#[allow(dead_code)]
333pub(crate) struct NatTraversalStats {
334 pub(super) remote_candidates_received: u32,
336 pub(super) local_candidates_sent: u32,
338 pub(super) validations_succeeded: u32,
340 pub(super) validations_failed: u32,
342 pub(super) coordination_rounds: u32,
344 pub(super) successful_coordinations: u32,
346 pub(super) failed_coordinations: u32,
348 pub(super) timed_out_coordinations: u32,
350 pub(super) coordination_failures: u32,
352 pub(super) direct_connections: u32,
354 pub(super) security_rejections: u32,
356 pub(super) rate_limit_violations: u32,
358 pub(super) invalid_address_rejections: u32,
360 pub(super) suspicious_coordination_attempts: u32,
362}
363#[derive(Debug)]
365#[allow(dead_code)]
366pub(super) struct SecurityValidationState {
367 candidate_rate_tracker: VecDeque<Instant>,
369 max_candidates_per_window: u32,
371 rate_window: Duration,
373 coordination_requests: VecDeque<CoordinationRequest>,
375 max_coordination_per_window: u32,
377 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
379 validation_cache_timeout: Duration,
381}
382#[derive(Debug, Clone)]
384struct CoordinationRequest {
385 timestamp: Instant,
387}
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390enum AddressValidationResult {
391 Valid,
393 Invalid,
395 Suspicious,
397}
398#[derive(Debug, Clone)]
400pub(super) struct AdaptiveTimeoutState {
401 current_timeout: Duration,
403 min_timeout: Duration,
405 max_timeout: Duration,
407 base_timeout: Duration,
409 backoff_multiplier: f64,
411 max_backoff_multiplier: f64,
413 jitter_factor: f64,
415 srtt: Option<Duration>,
417 rttvar: Option<Duration>,
419 last_rtt: Option<Duration>,
421 consecutive_timeouts: u32,
423 successful_responses: u32,
425}
426#[derive(Debug)]
428#[allow(dead_code)]
429pub(super) struct NetworkConditionMonitor {
430 rtt_samples: VecDeque<Duration>,
432 max_samples: usize,
434 packet_loss_rate: f64,
436 congestion_window: u32,
438 quality_score: f64,
440 last_quality_update: Instant,
442 quality_update_interval: Duration,
444 timeout_stats: TimeoutStatistics,
446}
447#[derive(Debug, Default)]
449struct TimeoutStatistics {
450 total_timeouts: u64,
452 total_responses: u64,
454 avg_response_time: Duration,
456 timeout_rate: f64,
458 last_update: Option<Instant>,
460}
461#[allow(dead_code)]
462impl SecurityValidationState {
463 fn new() -> Self {
465 Self {
466 candidate_rate_tracker: VecDeque::new(),
467 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
469 coordination_requests: VecDeque::new(),
470 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
472 validation_cache_timeout: Duration::from_secs(300), }
474 }
475 fn new_with_limits(
477 max_candidates_per_window: u32,
478 max_coordination_per_window: u32,
479 rate_window: Duration,
480 ) -> Self {
481 Self {
482 candidate_rate_tracker: VecDeque::new(),
483 max_candidates_per_window,
484 rate_window,
485 coordination_requests: VecDeque::new(),
486 max_coordination_per_window,
487 address_validation_cache: HashMap::new(),
488 validation_cache_timeout: Duration::from_secs(300),
489 }
490 }
491 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
496 self.cleanup_rate_tracker(now);
498 self.cleanup_coordination_tracker(now);
499 let _current_candidate_rate =
501 self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
502 let _current_coordination_rate =
503 self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
504
505 let peer_reputation = self.calculate_peer_reputation(peer_id);
507 let adaptive_candidate_limit =
508 (self.max_candidates_per_window as f64 * peer_reputation) as u32;
509 let adaptive_coordination_limit =
510 (self.max_coordination_per_window as f64 * peer_reputation) as u32;
511
512 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
514 debug!(
515 "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
516 hex::encode(&peer_id[..8]),
517 self.candidate_rate_tracker.len(),
518 adaptive_candidate_limit
519 );
520 return true;
521 }
522
523 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
524 debug!(
525 "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
526 hex::encode(&peer_id[..8]),
527 self.coordination_requests.len(),
528 adaptive_coordination_limit
529 );
530 return true;
531 }
532
533 false
534 }
535
536 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
541 1.0
550 }
551
552 fn validate_amplification_limits(
557 &mut self,
558 source_addr: SocketAddr,
559 target_addr: SocketAddr,
560 now: Instant,
561 ) -> Result<(), NatTraversalError> {
562 let amplification_key = (source_addr, target_addr);
564 if self.is_amplification_suspicious(amplification_key, now) {
573 warn!(
574 "Potential amplification attack detected: {} -> {}",
575 source_addr, target_addr
576 );
577 return Err(NatTraversalError::SuspiciousCoordination);
578 }
579
580 Ok(())
581 }
582
583 fn is_amplification_suspicious(
585 &self,
586 _amplification_key: (SocketAddr, SocketAddr),
587 _now: Instant,
588 ) -> bool {
589 false
598 }
599
600 fn generate_secure_coordination_round(&self) -> VarInt {
605 let secure_random: u64 = rand::random();
607 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
611 }
612
613 fn enhanced_address_validation(
621 &mut self,
622 addr: SocketAddr,
623 source_addr: SocketAddr,
624 now: Instant,
625 ) -> Result<AddressValidationResult, NatTraversalError> {
626 let basic_result = self.validate_address(addr, now);
628 match basic_result {
629 AddressValidationResult::Invalid => {
630 return Err(NatTraversalError::InvalidAddress);
631 }
632 AddressValidationResult::Suspicious => {
633 return Err(NatTraversalError::SuspiciousCoordination);
634 }
635 AddressValidationResult::Valid => {
636 }
638 }
639
640 self.validate_amplification_limits(source_addr, addr, now)?;
642
643 if self.is_address_in_suspicious_range(addr) {
645 warn!("Address in suspicious range detected: {}", addr);
646 return Err(NatTraversalError::SuspiciousCoordination);
647 }
648
649 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
650 warn!(
651 "Suspicious coordination pattern detected: {} -> {}",
652 source_addr, addr
653 );
654 return Err(NatTraversalError::SuspiciousCoordination);
655 }
656
657 Ok(AddressValidationResult::Valid)
658 }
659
660 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
662 match addr.ip() {
663 IpAddr::V4(ipv4) => {
664 let octets = ipv4.octets();
666 if octets[0] == 0 || octets[0] == 127 {
668 return true;
669 }
670
671 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
673 return true;
674 }
675 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
676 return true;
677 }
678 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
679 return true;
680 }
681
682 false
683 }
684 IpAddr::V6(ipv6) => {
685 if ipv6.is_loopback() || ipv6.is_unspecified() {
687 return true;
688 }
689
690 let segments = ipv6.segments();
692 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
693 return true;
694 }
695
696 false
697 }
698 }
699 }
700
701 fn is_coordination_pattern_suspicious(
703 &self,
704 _source_addr: SocketAddr,
705 _target_addr: SocketAddr,
706 _now: Instant,
707 ) -> bool {
708 false
717 }
718
719 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
721 self.cleanup_rate_tracker(now);
723 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
725 return true;
726 }
727
728 self.candidate_rate_tracker.push_back(now);
730 false
731 }
732
733 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
735 self.cleanup_coordination_tracker(now);
737 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
739 return true;
740 }
741
742 let request = CoordinationRequest { timestamp: now };
744 self.coordination_requests.push_back(request);
745 false
746 }
747
748 fn cleanup_rate_tracker(&mut self, now: Instant) {
750 let cutoff = now - self.rate_window;
751 while let Some(&front_time) = self.candidate_rate_tracker.front() {
752 if front_time < cutoff {
753 self.candidate_rate_tracker.pop_front();
754 } else {
755 break;
756 }
757 }
758 }
759 fn cleanup_coordination_tracker(&mut self, now: Instant) {
761 let cutoff = now - self.rate_window;
762 while let Some(front_request) = self.coordination_requests.front() {
763 if front_request.timestamp < cutoff {
764 self.coordination_requests.pop_front();
765 } else {
766 break;
767 }
768 }
769 }
770 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
772 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
774 return cached_result;
775 }
776 let result = self.perform_address_validation(addr);
777
778 self.address_validation_cache.insert(addr, result);
780
781 if self.address_validation_cache.len() > 1000 {
783 self.cleanup_address_cache(now);
784 }
785
786 result
787 }
788
789 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
791 match addr.ip() {
792 IpAddr::V4(ipv4) => {
793 if ipv4.is_unspecified() || ipv4.is_broadcast() {
795 return AddressValidationResult::Invalid;
796 }
797 if ipv4.is_multicast() || ipv4.is_documentation() {
799 return AddressValidationResult::Suspicious;
800 }
801
802 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
804 return AddressValidationResult::Invalid;
805 }
806
807 if self.is_suspicious_ipv4(ipv4) {
809 return AddressValidationResult::Suspicious;
810 }
811 }
812 IpAddr::V6(ipv6) => {
813 if ipv6.is_unspecified() || ipv6.is_multicast() {
815 return AddressValidationResult::Invalid;
816 }
817
818 if self.is_suspicious_ipv6(ipv6) {
820 return AddressValidationResult::Suspicious;
821 }
822 }
823 }
824
825 if addr.port() == 0 || addr.port() < 1024 {
827 return AddressValidationResult::Suspicious;
828 }
829
830 AddressValidationResult::Valid
831 }
832
833 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
835 let octets = ipv4.octets();
836 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
839 return true;
840 }
841
842 false
845 }
846
847 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
849 let segments = ipv6.segments();
850 if segments.iter().all(|&s| s == segments[0]) {
852 return true;
853 }
854
855 false
856 }
857
858 fn cleanup_address_cache(&mut self, _now: Instant) {
860 if self.address_validation_cache.len() > 500 {
863 let keys_to_remove: Vec<_> = self
864 .address_validation_cache
865 .keys()
866 .take(self.address_validation_cache.len() / 2)
867 .copied()
868 .collect();
869 for key in keys_to_remove {
870 self.address_validation_cache.remove(&key);
871 }
872 }
873 }
874
875 fn validate_punch_me_now_frame(
883 &mut self,
884 frame: &crate::frame::PunchMeNow,
885 source_addr: SocketAddr,
886 peer_id: [u8; 32],
887 now: Instant,
888 ) -> Result<(), NatTraversalError> {
889 if self.is_coordination_rate_limited(now) {
891 debug!(
892 "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
893 hex::encode(&peer_id[..8])
894 );
895 return Err(NatTraversalError::RateLimitExceeded);
896 }
897 let addr_validation = self.validate_address(frame.address, now);
899 match addr_validation {
900 AddressValidationResult::Invalid => {
901 debug!(
902 "PUNCH_ME_NOW frame rejected: invalid address {:?} from peer {:?}",
903 frame.address,
904 hex::encode(&peer_id[..8])
905 );
906 return Err(NatTraversalError::InvalidAddress);
907 }
908 AddressValidationResult::Suspicious => {
909 debug!(
910 "PUNCH_ME_NOW frame rejected: suspicious address {:?} from peer {:?}",
911 frame.address,
912 hex::encode(&peer_id[..8])
913 );
914 return Err(NatTraversalError::SuspiciousCoordination);
915 }
916 AddressValidationResult::Valid => {
917 }
919 }
920
921 if !self.validate_address_consistency(frame.address, source_addr) {
924 debug!(
925 "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
926 frame.address, source_addr
927 );
928 return Err(NatTraversalError::SuspiciousCoordination);
929 }
930
931 if !self.validate_coordination_parameters(frame) {
933 debug!(
934 "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
935 hex::encode(&peer_id[..8])
936 );
937 return Err(NatTraversalError::SuspiciousCoordination);
938 }
939
940 if let Some(target_peer_id) = frame.target_peer_id {
942 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
943 debug!(
944 "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
945 hex::encode(&peer_id[..8]),
946 hex::encode(&target_peer_id[..8])
947 );
948 return Err(NatTraversalError::SuspiciousCoordination);
949 }
950 }
951
952 if !self.validate_resource_limits(frame) {
954 debug!(
955 "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
956 hex::encode(&peer_id[..8])
957 );
958 return Err(NatTraversalError::ResourceLimitExceeded);
959 }
960
961 debug!(
962 "PUNCH_ME_NOW frame validation passed for peer {:?}",
963 hex::encode(&peer_id[..8])
964 );
965 Ok(())
966 }
967
968 fn validate_address_consistency(
973 &self,
974 claimed_addr: SocketAddr,
975 observed_addr: SocketAddr,
976 ) -> bool {
977 match (claimed_addr.ip(), observed_addr.ip()) {
981 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
982 if claimed_ip == observed_ip {
984 return true;
985 }
986
987 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
989 return true;
990 }
991
992 !claimed_ip.is_private() && !observed_ip.is_private()
995 }
996 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
997 claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
999 }
1000 _ => {
1001 false
1003 }
1004 }
1005 }
1006
1007 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1009 let ip1_octets = ip1.octets();
1011 let ip2_octets = ip2.octets();
1012 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1014 return true;
1015 }
1016
1017 if ip1_octets[0] == 172
1019 && ip2_octets[0] == 172
1020 && (16..=31).contains(&ip1_octets[1])
1021 && (16..=31).contains(&ip2_octets[1])
1022 {
1023 return true;
1024 }
1025
1026 if ip1_octets[0] == 192
1028 && ip1_octets[1] == 168
1029 && ip2_octets[0] == 192
1030 && ip2_octets[1] == 168
1031 {
1032 return true;
1033 }
1034
1035 false
1036 }
1037
1038 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1040 let segments1 = ip1.segments();
1042 let segments2 = ip2.segments();
1043 segments1[0] == segments2[0]
1044 && segments1[1] == segments2[1]
1045 && segments1[2] == segments2[2]
1046 && segments1[3] == segments2[3]
1047 }
1048
1049 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1051 if frame.round.into_inner() > 1000000 {
1053 return false;
1054 }
1055 if frame.paired_with_sequence_number.into_inner() > 10000 {
1057 return false;
1058 }
1059
1060 match frame.address.ip() {
1062 IpAddr::V4(ipv4) => {
1063 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1065 }
1066 IpAddr::V6(ipv6) => {
1067 !ipv6.is_unspecified() && !ipv6.is_multicast()
1069 }
1070 }
1071 }
1072
1073 fn validate_target_peer_request(
1075 &self,
1076 requesting_peer: [u8; 32],
1077 target_peer: [u8; 32],
1078 _frame: &crate::frame::PunchMeNow,
1079 ) -> bool {
1080 if requesting_peer == target_peer {
1082 return false;
1083 }
1084 true
1090 }
1091
1092 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1094 self.coordination_requests.len() < self.max_coordination_per_window as usize
1102 }
1103}
1104
1105impl AdaptiveTimeoutState {
1106 pub(crate) fn new() -> Self {
1108 let base_timeout = Duration::from_millis(1000); Self {
1110 current_timeout: base_timeout,
1111 min_timeout: Duration::from_millis(100),
1112 max_timeout: Duration::from_secs(30),
1113 base_timeout,
1114 backoff_multiplier: 1.0,
1115 max_backoff_multiplier: 8.0,
1116 jitter_factor: 0.1, srtt: None,
1118 rttvar: None,
1119 last_rtt: None,
1120 consecutive_timeouts: 0,
1121 successful_responses: 0,
1122 }
1123 }
1124 fn update_success(&mut self, rtt: Duration) {
1126 self.last_rtt = Some(rtt);
1127 self.successful_responses += 1;
1128 self.consecutive_timeouts = 0;
1129 match self.srtt {
1131 None => {
1132 self.srtt = Some(rtt);
1133 self.rttvar = Some(rtt / 2);
1134 }
1135 Some(srtt) => {
1136 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1137 let abs_diff = rtt.abs_diff(srtt);
1138
1139 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1140 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1141 }
1142 }
1143
1144 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1146
1147 self.calculate_current_timeout();
1149 }
1150
1151 fn update_timeout(&mut self) {
1153 self.consecutive_timeouts += 1;
1154 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1156
1157 self.calculate_current_timeout();
1159 }
1160
1161 fn calculate_current_timeout(&mut self) {
1163 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1164 srtt + rttvar * 4
1166 } else {
1167 self.base_timeout
1168 };
1169 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1171
1172 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1174 let timeout = timeout.mul_f64(jitter);
1175
1176 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1178 }
1179
1180 fn get_timeout(&self) -> Duration {
1182 self.current_timeout
1183 }
1184 fn should_retry(&self, max_retries: u32) -> bool {
1186 self.consecutive_timeouts < max_retries
1187 }
1188 fn get_retry_delay(&self) -> Duration {
1190 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1191 delay.clamp(self.min_timeout, self.max_timeout)
1192 }
1193}
1194#[derive(Debug)]
1196#[allow(dead_code)]
1197pub(super) struct ResourceManagementConfig {
1198 max_active_validations: usize,
1200 max_local_candidates: usize,
1202 max_remote_candidates: usize,
1204 max_candidate_pairs: usize,
1206 max_coordination_history: usize,
1208 cleanup_interval: Duration,
1210 candidate_timeout: Duration,
1212 validation_timeout: Duration,
1214 coordination_timeout: Duration,
1216 memory_pressure_threshold: f64,
1218 aggressive_cleanup_threshold: f64,
1220}
1221#[derive(Debug, Default)]
1223#[allow(dead_code)]
1224pub(super) struct ResourceStats {
1225 active_validations: usize,
1227 local_candidates: usize,
1229 remote_candidates: usize,
1231 candidate_pairs: usize,
1233 peak_memory_usage: usize,
1235 cleanup_operations: u64,
1237 resources_cleaned: u64,
1239 allocation_failures: u64,
1241 last_cleanup: Option<Instant>,
1243 memory_pressure: f64,
1245}
1246#[derive(Debug)]
1248pub(super) struct ResourceCleanupCoordinator {
1249 config: ResourceManagementConfig,
1251 stats: ResourceStats,
1253 last_cleanup: Option<Instant>,
1255 cleanup_counter: u64,
1257 shutdown_requested: bool,
1259}
1260impl ResourceManagementConfig {
1261 fn new() -> Self {
1263 Self {
1264 max_active_validations: 100,
1265 max_local_candidates: 50,
1266 max_remote_candidates: 100,
1267 max_candidate_pairs: 200,
1268 max_coordination_history: 10,
1269 cleanup_interval: Duration::from_secs(30),
1270 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1272 coordination_timeout: Duration::from_secs(60),
1273 memory_pressure_threshold: 0.75,
1274 aggressive_cleanup_threshold: 0.90,
1275 }
1276 }
1277 #[cfg(feature = "low_memory")]
1279 fn low_memory() -> Self {
1280 Self {
1281 max_active_validations: 25,
1282 max_local_candidates: 10,
1283 max_remote_candidates: 25,
1284 max_candidate_pairs: 50,
1285 max_coordination_history: 3,
1286 cleanup_interval: Duration::from_secs(15),
1287 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1289 coordination_timeout: Duration::from_secs(30),
1290 memory_pressure_threshold: 0.60,
1291 aggressive_cleanup_threshold: 0.80,
1292 }
1293 }
1294}
1295#[allow(dead_code)]
1296impl ResourceCleanupCoordinator {
1297 fn new() -> Self {
1299 Self {
1300 config: ResourceManagementConfig::new(),
1301 stats: ResourceStats::default(),
1302 last_cleanup: None,
1303 cleanup_counter: 0,
1304 shutdown_requested: false,
1305 }
1306 }
1307 #[cfg(feature = "low_memory")]
1309 fn low_memory() -> Self {
1310 Self {
1311 config: ResourceManagementConfig::low_memory(),
1312 stats: ResourceStats::default(),
1313 last_cleanup: None,
1314 cleanup_counter: 0,
1315 shutdown_requested: false,
1316 }
1317 }
1318 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1320 state.active_validations.len() > self.config.max_active_validations
1321 || state.local_candidates.len() > self.config.max_local_candidates
1322 || state.remote_candidates.len() > self.config.max_remote_candidates
1323 || state.candidate_pairs.len() > self.config.max_candidate_pairs
1324 }
1325 fn calculate_memory_pressure(
1327 &mut self,
1328 active_validations_len: usize,
1329 local_candidates_len: usize,
1330 remote_candidates_len: usize,
1331 candidate_pairs_len: usize,
1332 ) -> f64 {
1333 let total_limit = self.config.max_active_validations
1334 + self.config.max_local_candidates
1335 + self.config.max_remote_candidates
1336 + self.config.max_candidate_pairs;
1337 let current_usage = active_validations_len
1338 + local_candidates_len
1339 + remote_candidates_len
1340 + candidate_pairs_len;
1341
1342 let pressure = current_usage as f64 / total_limit as f64;
1343 self.stats.memory_pressure = pressure;
1344 pressure
1345 }
1346
1347 fn should_cleanup(&self, now: Instant) -> bool {
1349 if self.shutdown_requested {
1350 return true;
1351 }
1352 if let Some(last_cleanup) = self.last_cleanup {
1354 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1355 return true;
1356 }
1357 } else {
1358 return true; }
1360
1361 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1363 return true;
1364 }
1365
1366 false
1367 }
1368
1369 fn cleanup_expired_resources(
1371 &mut self,
1372 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1373 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1374 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1375 candidate_pairs: &mut Vec<CandidatePair>,
1376 coordination: &mut Option<CoordinationState>,
1377 now: Instant,
1378 ) -> u64 {
1379 let mut cleaned = 0;
1380 cleaned += self.cleanup_expired_validations(active_validations, now);
1382
1383 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1385
1386 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1388
1389 cleaned += self.cleanup_old_coordination(coordination, now);
1391
1392 self.stats.cleanup_operations += 1;
1394 self.stats.resources_cleaned += cleaned;
1395 self.last_cleanup = Some(now);
1396 self.cleanup_counter += 1;
1397
1398 debug!("Cleaned up {} expired resources", cleaned);
1399 cleaned
1400 }
1401
1402 fn cleanup_expired_validations(
1404 &mut self,
1405 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1406 now: Instant,
1407 ) -> u64 {
1408 let mut cleaned = 0;
1409 let validation_timeout = self.config.validation_timeout;
1410 active_validations.retain(|_addr, validation| {
1411 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1412 if is_expired {
1413 cleaned += 1;
1414 trace!("Cleaned up expired validation for {:?}", _addr);
1415 }
1416 !is_expired
1417 });
1418
1419 cleaned
1420 }
1421
1422 fn cleanup_stale_candidates(
1424 &mut self,
1425 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1426 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1427 now: Instant,
1428 ) -> u64 {
1429 let mut cleaned = 0;
1430 let candidate_timeout = self.config.candidate_timeout;
1431 local_candidates.retain(|_seq, candidate| {
1433 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1434 || candidate.state == CandidateState::Failed
1435 || candidate.state == CandidateState::Removed;
1436 if is_stale {
1437 cleaned += 1;
1438 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1439 }
1440 !is_stale
1441 });
1442
1443 remote_candidates.retain(|_seq, candidate| {
1445 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1446 || candidate.state == CandidateState::Failed
1447 || candidate.state == CandidateState::Removed;
1448 if is_stale {
1449 cleaned += 1;
1450 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1451 }
1452 !is_stale
1453 });
1454
1455 cleaned
1456 }
1457
1458 fn cleanup_failed_pairs(
1460 &mut self,
1461 candidate_pairs: &mut Vec<CandidatePair>,
1462 now: Instant,
1463 ) -> u64 {
1464 let mut cleaned = 0;
1465 let pair_timeout = self.config.candidate_timeout;
1466 candidate_pairs.retain(|pair| {
1467 let is_stale = now.duration_since(pair.created_at) > pair_timeout
1468 || pair.state == PairState::Failed;
1469 if is_stale {
1470 cleaned += 1;
1471 trace!(
1472 "Cleaned up failed candidate pair {:?} -> {:?}",
1473 pair.local_addr, pair.remote_addr
1474 );
1475 }
1476 !is_stale
1477 });
1478
1479 cleaned
1480 }
1481
1482 fn cleanup_old_coordination(
1484 &mut self,
1485 coordination: &mut Option<CoordinationState>,
1486 now: Instant,
1487 ) -> u64 {
1488 let mut cleaned = 0;
1489 if let Some(coord) = coordination {
1490 let is_expired =
1491 now.duration_since(coord.round_start) > self.config.coordination_timeout;
1492 let is_failed = coord.state == CoordinationPhase::Failed;
1493
1494 if is_expired || is_failed {
1495 let round = coord.round;
1496 *coordination = None;
1497 cleaned += 1;
1498 trace!("Cleaned up old coordination state for round {}", round);
1499 }
1500 }
1501
1502 cleaned
1503 }
1504
1505 fn aggressive_cleanup(
1507 &mut self,
1508 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1509 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1510 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1511 candidate_pairs: &mut Vec<CandidatePair>,
1512 now: Instant,
1513 ) -> u64 {
1514 let mut cleaned = 0;
1515 let aggressive_timeout = self.config.candidate_timeout / 2;
1517
1518 local_candidates.retain(|_seq, candidate| {
1520 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1521 && candidate.state != CandidateState::Failed;
1522 if !keep {
1523 cleaned += 1;
1524 }
1525 keep
1526 });
1527
1528 remote_candidates.retain(|_seq, candidate| {
1529 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1530 && candidate.state != CandidateState::Failed;
1531 if !keep {
1532 cleaned += 1;
1533 }
1534 keep
1535 });
1536
1537 candidate_pairs.retain(|pair| {
1539 let keep = pair.state != PairState::Waiting
1540 || now.duration_since(pair.created_at) <= aggressive_timeout;
1541 if !keep {
1542 cleaned += 1;
1543 }
1544 keep
1545 });
1546
1547 active_validations.retain(|_addr, validation| {
1549 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1550 if !keep {
1551 cleaned += 1;
1552 }
1553 keep
1554 });
1555
1556 warn!(
1557 "Aggressive cleanup removed {} resources due to memory pressure",
1558 cleaned
1559 );
1560 cleaned
1561 }
1562
1563 fn request_shutdown(&mut self) {
1565 self.shutdown_requested = true;
1566 debug!("Resource cleanup coordinator shutdown requested");
1567 }
1568 fn shutdown_cleanup(
1570 &mut self,
1571 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1572 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1573 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1574 candidate_pairs: &mut Vec<CandidatePair>,
1575 coordination: &mut Option<CoordinationState>,
1576 ) -> u64 {
1577 let mut cleaned = 0;
1578 cleaned += active_validations.len() as u64;
1580 active_validations.clear();
1581
1582 cleaned += local_candidates.len() as u64;
1583 local_candidates.clear();
1584
1585 cleaned += remote_candidates.len() as u64;
1586 remote_candidates.clear();
1587
1588 cleaned += candidate_pairs.len() as u64;
1589 candidate_pairs.clear();
1590
1591 if coordination.is_some() {
1592 *coordination = None;
1593 cleaned += 1;
1594 }
1595
1596 info!("Shutdown cleanup removed {} resources", cleaned);
1597 cleaned
1598 }
1599
1600 fn get_resource_stats(&self) -> &ResourceStats {
1602 &self.stats
1603 }
1604 fn update_stats(
1606 &mut self,
1607 active_validations_len: usize,
1608 local_candidates_len: usize,
1609 remote_candidates_len: usize,
1610 candidate_pairs_len: usize,
1611 ) {
1612 self.stats.active_validations = active_validations_len;
1613 self.stats.local_candidates = local_candidates_len;
1614 self.stats.remote_candidates = remote_candidates_len;
1615 self.stats.candidate_pairs = candidate_pairs_len;
1616 let current_usage = self.stats.active_validations
1618 + self.stats.local_candidates
1619 + self.stats.remote_candidates
1620 + self.stats.candidate_pairs;
1621
1622 if current_usage > self.stats.peak_memory_usage {
1623 self.stats.peak_memory_usage = current_usage;
1624 }
1625 }
1626
1627 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1629 self.last_cleanup = Some(now);
1630 self.cleanup_counter += 1;
1631 self.stats.cleanup_operations += 1;
1633
1634 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1635 }
1636}
1637
1638#[allow(dead_code)]
1639impl NetworkConditionMonitor {
1640 fn new() -> Self {
1642 Self {
1643 rtt_samples: VecDeque::new(),
1644 max_samples: 20,
1645 packet_loss_rate: 0.0,
1646 congestion_window: 10,
1647 quality_score: 0.8, last_quality_update: Instant::now(),
1649 quality_update_interval: Duration::from_secs(10),
1650 timeout_stats: TimeoutStatistics::default(),
1651 }
1652 }
1653 fn record_success(&mut self, rtt: Duration, now: Instant) {
1655 self.rtt_samples.push_back(rtt);
1657 if self.rtt_samples.len() > self.max_samples {
1658 self.rtt_samples.pop_front();
1659 }
1660 self.timeout_stats.total_responses += 1;
1662 self.update_timeout_stats(now);
1663
1664 self.update_quality_score(now);
1666 }
1667
1668 fn record_timeout(&mut self, now: Instant) {
1670 self.timeout_stats.total_timeouts += 1;
1671 self.update_timeout_stats(now);
1672 self.update_quality_score(now);
1674 }
1675
1676 fn update_timeout_stats(&mut self, now: Instant) {
1678 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1679 if total_attempts > 0 {
1680 self.timeout_stats.timeout_rate =
1681 self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1682 }
1683
1684 if !self.rtt_samples.is_empty() {
1686 let total_rtt: Duration = self.rtt_samples.iter().sum();
1687 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1688 }
1689
1690 self.timeout_stats.last_update = Some(now);
1691 }
1692
1693 fn update_quality_score(&mut self, now: Instant) {
1695 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1696 return;
1697 }
1698 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1700 let rtt_factor = self.calculate_rtt_factor();
1701 let consistency_factor = self.calculate_consistency_factor();
1702
1703 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1705
1706 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1708 self.last_quality_update = now;
1709 }
1710
1711 fn calculate_rtt_factor(&self) -> f64 {
1713 if self.rtt_samples.is_empty() {
1714 return 0.5; }
1716 let avg_rtt = self.timeout_stats.avg_response_time;
1717
1718 let rtt_ms = avg_rtt.as_millis() as f64;
1720 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1721 factor.clamp(0.0, 1.0)
1722 }
1723
1724 fn calculate_consistency_factor(&self) -> f64 {
1726 if self.rtt_samples.len() < 3 {
1727 return 0.5; }
1729 let mean_rtt = self.timeout_stats.avg_response_time;
1731 let variance: f64 = self
1732 .rtt_samples
1733 .iter()
1734 .map(|rtt| {
1735 let diff = (*rtt).abs_diff(mean_rtt);
1736 diff.as_millis() as f64
1737 })
1738 .map(|diff| diff * diff)
1739 .sum::<f64>()
1740 / self.rtt_samples.len() as f64;
1741
1742 let std_dev = variance.sqrt();
1743
1744 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1746 consistency.clamp(0.0, 1.0)
1747 }
1748
1749 fn get_quality_score(&self) -> f64 {
1751 self.quality_score
1752 }
1753 fn get_estimated_rtt(&self) -> Option<Duration> {
1755 if self.rtt_samples.is_empty() {
1756 return None;
1757 }
1758 Some(self.timeout_stats.avg_response_time)
1759 }
1760
1761 fn is_suitable_for_coordination(&self) -> bool {
1763 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1765 }
1766 fn get_packet_loss_rate(&self) -> f64 {
1768 self.packet_loss_rate
1769 }
1770
1771 fn get_timeout_multiplier(&self) -> f64 {
1773 let base_multiplier = 1.0;
1774
1775 let quality_multiplier = if self.quality_score < 0.3 {
1777 2.0 } else if self.quality_score > 0.8 {
1779 0.8 } else {
1781 1.0 };
1783
1784 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1786
1787 base_multiplier * quality_multiplier * loss_multiplier
1788 }
1789
1790 fn cleanup(&mut self, now: Instant) {
1792 let _cutoff_time = now - Duration::from_secs(60);
1794
1795 if let Some(last_update) = self.timeout_stats.last_update {
1797 if now.duration_since(last_update) > Duration::from_secs(300) {
1798 self.timeout_stats = TimeoutStatistics::default();
1799 }
1800 }
1801 }
1802}
1803
1804#[allow(dead_code)]
1805impl NatTraversalState {
1806 pub(super) fn new(
1808 role: NatTraversalRole,
1809 max_candidates: u32,
1810 coordination_timeout: Duration,
1811 ) -> Self {
1812 let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1813 Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1814 } else {
1815 None
1816 };
1817 Self {
1818 role,
1819 local_candidates: HashMap::new(),
1820 remote_candidates: HashMap::new(),
1821 candidate_pairs: Vec::new(),
1822 pair_index: HashMap::new(),
1823 active_validations: HashMap::new(),
1824 coordination: None,
1825 next_sequence: VarInt::from_u32(1),
1826 max_candidates,
1827 coordination_timeout,
1828 stats: NatTraversalStats::default(),
1829 security_state: SecurityValidationState::new(),
1830 network_monitor: NetworkConditionMonitor::new(),
1831 resource_manager: ResourceCleanupCoordinator::new(),
1832 bootstrap_coordinator,
1833 }
1834 }
1835
1836 pub(super) fn add_remote_candidate(
1838 &mut self,
1839 sequence: VarInt,
1840 address: SocketAddr,
1841 priority: VarInt,
1842 now: Instant,
1843 ) -> Result<(), NatTraversalError> {
1844 if self.should_reject_new_resources(now) {
1846 debug!(
1847 "Rejecting new candidate due to resource limits: {}",
1848 address
1849 );
1850 return Err(NatTraversalError::ResourceLimitExceeded);
1851 }
1852 if self.security_state.is_candidate_rate_limited(now) {
1854 self.stats.rate_limit_violations += 1;
1855 debug!("Rate limit exceeded for candidate addition: {}", address);
1856 return Err(NatTraversalError::RateLimitExceeded);
1857 }
1858
1859 match self.security_state.validate_address(address, now) {
1861 AddressValidationResult::Invalid => {
1862 self.stats.invalid_address_rejections += 1;
1863 self.stats.security_rejections += 1;
1864 debug!("Invalid address rejected: {}", address);
1865 return Err(NatTraversalError::InvalidAddress);
1866 }
1867 AddressValidationResult::Suspicious => {
1868 self.stats.security_rejections += 1;
1869 debug!("Suspicious address rejected: {}", address);
1870 return Err(NatTraversalError::SecurityValidationFailed);
1871 }
1872 AddressValidationResult::Valid => {
1873 }
1875 }
1876
1877 if self.remote_candidates.len() >= self.max_candidates as usize {
1879 return Err(NatTraversalError::TooManyCandidates);
1880 }
1881
1882 if self
1884 .remote_candidates
1885 .values()
1886 .any(|c| c.address == address && c.state != CandidateState::Removed)
1887 {
1888 return Err(NatTraversalError::DuplicateAddress);
1889 }
1890
1891 let candidate = AddressCandidate {
1892 address,
1893 priority: priority.into_inner() as u32,
1894 source: CandidateSource::Peer,
1895 discovered_at: now,
1896 state: CandidateState::New,
1897 attempt_count: 0,
1898 last_attempt: None,
1899 };
1900
1901 self.remote_candidates.insert(sequence, candidate);
1902 self.stats.remote_candidates_received += 1;
1903
1904 trace!(
1905 "Added remote candidate: {} with priority {}",
1906 address, priority
1907 );
1908 Ok(())
1909 }
1910
1911 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1913 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1914 candidate.state = CandidateState::Removed;
1915 self.active_validations.remove(&candidate.address);
1917 true
1918 } else {
1919 false
1920 }
1921 }
1922
1923 #[allow(clippy::expect_used)]
1925 pub(super) fn add_local_candidate(
1926 &mut self,
1927 address: SocketAddr,
1928 source: CandidateSource,
1929 now: Instant,
1930 ) -> VarInt {
1931 let sequence = self.next_sequence;
1932 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1933 .expect("sequence number overflow");
1934 let candidate_type = classify_candidate_type(source);
1936 let local_preference = self.calculate_local_preference(address);
1937 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1938
1939 let candidate = AddressCandidate {
1940 address,
1941 priority,
1942 source,
1943 discovered_at: now,
1944 state: CandidateState::New,
1945 attempt_count: 0,
1946 last_attempt: None,
1947 };
1948
1949 self.local_candidates.insert(sequence, candidate);
1950 self.stats.local_candidates_sent += 1;
1951
1952 self.generate_candidate_pairs(now);
1954
1955 sequence
1956 }
1957
1958 fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1960 match addr {
1961 SocketAddr::V4(v4) => {
1962 if v4.ip().is_loopback() {
1963 0 } else if v4.ip().is_private() {
1965 65000 } else {
1967 32000 }
1969 }
1970 SocketAddr::V6(v6) => {
1971 if v6.ip().is_loopback() {
1972 0
1973 } else if v6.ip().segments()[0] == 0xfe80 {
1974 30000 } else {
1977 50000 }
1979 }
1980 }
1981 }
1982 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1984 self.candidate_pairs.clear();
1985 self.pair_index.clear();
1986 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1988 self.candidate_pairs.reserve(estimated_capacity);
1989 self.pair_index.reserve(estimated_capacity);
1990
1991 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
1993
1994 for local_candidate in self.local_candidates.values() {
1995 if local_candidate.state == CandidateState::Removed {
1997 continue;
1998 }
1999
2000 let local_type = classify_candidate_type(local_candidate.source);
2002
2003 for (remote_seq, remote_candidate) in &self.remote_candidates {
2004 if remote_candidate.state == CandidateState::Removed {
2006 continue;
2007 }
2008
2009 let cache_key = (local_candidate.address, remote_candidate.address);
2011 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2012 are_candidates_compatible(local_candidate, remote_candidate)
2013 });
2014
2015 if !compatible {
2016 continue;
2017 }
2018
2019 let pair_priority =
2021 calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
2022
2023 let remote_type = classify_candidate_type(remote_candidate.source);
2025 let pair_type = classify_pair_type(local_type, remote_type);
2026
2027 let pair = CandidatePair {
2028 remote_sequence: *remote_seq,
2029 local_addr: local_candidate.address,
2030 remote_addr: remote_candidate.address,
2031 priority: pair_priority,
2032 state: PairState::Waiting,
2033 pair_type,
2034 created_at: now,
2035 last_check: None,
2036 };
2037
2038 let index = self.candidate_pairs.len();
2040 self.pair_index.insert(remote_candidate.address, index);
2041 self.candidate_pairs.push(pair);
2042 }
2043 }
2044
2045 self.candidate_pairs
2047 .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2048
2049 self.pair_index.clear();
2051 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2052 self.pair_index.insert(pair.remote_addr, idx);
2053 }
2054
2055 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2056 }
2057
2058 pub(super) fn get_next_validation_pairs(
2060 &mut self,
2061 max_concurrent: usize,
2062 ) -> Vec<&mut CandidatePair> {
2063 let mut result = Vec::with_capacity(max_concurrent);
2066 for pair in self.candidate_pairs.iter_mut() {
2067 if pair.state == PairState::Waiting {
2068 result.push(pair);
2069 if result.len() >= max_concurrent {
2070 break;
2071 }
2072 }
2073 }
2074
2075 result
2076 }
2077
2078 pub(super) fn find_pair_by_remote_addr(
2080 &mut self,
2081 addr: SocketAddr,
2082 ) -> Option<&mut CandidatePair> {
2083 if let Some(&index) = self.pair_index.get(&addr) {
2085 self.candidate_pairs.get_mut(index)
2086 } else {
2087 None
2088 }
2089 }
2090 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2092 let (succeeded_type, succeeded_priority) = {
2094 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2095 pair.state = PairState::Succeeded;
2096 (pair.pair_type, pair.priority)
2097 } else {
2098 return false;
2099 }
2100 };
2101 for other_pair in &mut self.candidate_pairs {
2103 if other_pair.pair_type == succeeded_type
2104 && other_pair.priority < succeeded_priority
2105 && other_pair.state == PairState::Waiting
2106 {
2107 other_pair.state = PairState::Frozen;
2108 }
2109 }
2110
2111 true
2112 }
2113
2114 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2116 let mut best_ipv4: Option<&CandidatePair> = None;
2117 let mut best_ipv6: Option<&CandidatePair> = None;
2118 for pair in &self.candidate_pairs {
2119 if pair.state != PairState::Succeeded {
2120 continue;
2121 }
2122
2123 match pair.remote_addr {
2124 SocketAddr::V4(_) => {
2125 if best_ipv4.is_none_or(|best| pair.priority > best.priority) {
2126 best_ipv4 = Some(pair);
2127 }
2128 }
2129 SocketAddr::V6(_) => {
2130 if best_ipv6.is_none_or(|best| pair.priority > best.priority) {
2131 best_ipv6 = Some(pair);
2132 }
2133 }
2134 }
2135 }
2136
2137 let mut result = Vec::new();
2138 if let Some(pair) = best_ipv4 {
2139 result.push(pair);
2140 }
2141 if let Some(pair) = best_ipv6 {
2142 result.push(pair);
2143 }
2144 result
2145 }
2146
2147 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2149 let mut candidates: Vec<_> = self
2150 .remote_candidates
2151 .iter()
2152 .filter(|(_, c)| c.state == CandidateState::New)
2153 .map(|(k, v)| (*k, v))
2154 .collect();
2155 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2157 candidates
2158 }
2159
2160 pub(super) fn start_validation(
2162 &mut self,
2163 sequence: VarInt,
2164 challenge: u64,
2165 now: Instant,
2166 ) -> Result<(), NatTraversalError> {
2167 let candidate = self
2168 .remote_candidates
2169 .get_mut(&sequence)
2170 .ok_or(NatTraversalError::UnknownCandidate)?;
2171 if candidate.state != CandidateState::New {
2172 return Err(NatTraversalError::InvalidCandidateState);
2173 }
2174
2175 if Self::is_validation_suspicious(candidate, now) {
2177 self.stats.security_rejections += 1;
2178 debug!(
2179 "Suspicious validation attempt rejected for address {}",
2180 candidate.address
2181 );
2182 return Err(NatTraversalError::SecurityValidationFailed);
2183 }
2184
2185 if self.active_validations.len() >= 10 {
2187 debug!(
2188 "Too many concurrent validations, rejecting new validation for {}",
2189 candidate.address
2190 );
2191 return Err(NatTraversalError::SecurityValidationFailed);
2192 }
2193
2194 candidate.state = CandidateState::Validating;
2196 candidate.attempt_count += 1;
2197 candidate.last_attempt = Some(now);
2198
2199 let validation = PathValidationState {
2201 challenge,
2202 sent_at: now,
2203 retry_count: 0,
2204 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2206 timeout_state: AdaptiveTimeoutState::new(),
2207 last_retry_at: None,
2208 };
2209
2210 self.active_validations
2211 .insert(candidate.address, validation);
2212 trace!(
2213 "Started validation for candidate {} with challenge {}",
2214 candidate.address, challenge
2215 );
2216 Ok(())
2217 }
2218
2219 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2221 if candidate.attempt_count > 10 {
2223 return true;
2224 }
2225 if let Some(last_attempt) = candidate.last_attempt {
2227 let time_since_last = now.duration_since(last_attempt);
2228 if time_since_last < Duration::from_millis(100) {
2229 return true; }
2231 }
2232
2233 if candidate.state == CandidateState::Failed {
2235 let time_since_discovery = now.duration_since(candidate.discovered_at);
2236 if time_since_discovery < Duration::from_secs(60) {
2237 return true; }
2239 }
2240
2241 false
2242 }
2243
2244 pub(super) fn handle_validation_success(
2246 &mut self,
2247 remote_addr: SocketAddr,
2248 challenge: u64,
2249 now: Instant,
2250 ) -> Result<VarInt, NatTraversalError> {
2251 let sequence = self
2253 .remote_candidates
2254 .iter()
2255 .find(|(_, c)| c.address == remote_addr)
2256 .map(|(seq, _)| *seq)
2257 .ok_or(NatTraversalError::UnknownCandidate)?;
2258 let validation = self
2260 .active_validations
2261 .get_mut(&remote_addr)
2262 .ok_or(NatTraversalError::NoActiveValidation)?;
2263
2264 if validation.challenge != challenge {
2265 return Err(NatTraversalError::ChallengeMismatch);
2266 }
2267
2268 let rtt = now.duration_since(validation.sent_at);
2270 validation.timeout_state.update_success(rtt);
2271
2272 self.network_monitor.record_success(rtt, now);
2274
2275 let candidate = self
2277 .remote_candidates
2278 .get_mut(&sequence)
2279 .ok_or(NatTraversalError::UnknownCandidate)?;
2280
2281 candidate.state = CandidateState::Valid;
2282 self.active_validations.remove(&remote_addr);
2283 self.stats.validations_succeeded += 1;
2284
2285 trace!(
2286 "Validation successful for {} with RTT {:?}",
2287 remote_addr, rtt
2288 );
2289 Ok(sequence)
2290 }
2291
2292 pub(super) fn start_coordination_round(
2294 &mut self,
2295 targets: Vec<PunchTarget>,
2296 now: Instant,
2297 ) -> Result<VarInt, NatTraversalError> {
2298 if self.security_state.is_coordination_rate_limited(now) {
2300 self.stats.rate_limit_violations += 1;
2301 debug!(
2302 "Rate limit exceeded for coordination request with {} targets",
2303 targets.len()
2304 );
2305 return Err(NatTraversalError::RateLimitExceeded);
2306 }
2307 if self.is_coordination_suspicious(&targets, now) {
2309 self.stats.suspicious_coordination_attempts += 1;
2310 self.stats.security_rejections += 1;
2311 debug!(
2312 "Suspicious coordination request rejected with {} targets",
2313 targets.len()
2314 );
2315 return Err(NatTraversalError::SuspiciousCoordination);
2316 }
2317
2318 for target in &targets {
2320 match self
2321 .security_state
2322 .validate_address(target.remote_addr, now)
2323 {
2324 AddressValidationResult::Invalid => {
2325 self.stats.invalid_address_rejections += 1;
2326 self.stats.security_rejections += 1;
2327 debug!(
2328 "Invalid target address in coordination: {}",
2329 target.remote_addr
2330 );
2331 return Err(NatTraversalError::InvalidAddress);
2332 }
2333 AddressValidationResult::Suspicious => {
2334 self.stats.security_rejections += 1;
2335 debug!(
2336 "Suspicious target address in coordination: {}",
2337 target.remote_addr
2338 );
2339 return Err(NatTraversalError::SecurityValidationFailed);
2340 }
2341 AddressValidationResult::Valid => {
2342 }
2344 }
2345 }
2346
2347 let round = self.next_sequence;
2348 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2349 .expect("sequence number overflow");
2350
2351 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2354
2355 self.coordination = Some(CoordinationState {
2356 round,
2357 punch_targets: targets,
2358 round_start: now,
2359 punch_start,
2360 round_duration: self.coordination_timeout,
2361 state: CoordinationPhase::Requesting,
2362 punch_request_sent: false,
2363 peer_punch_received: false,
2364 retry_count: 0,
2365 max_retries: 3,
2366 timeout_state: AdaptiveTimeoutState::new(),
2367 last_retry_at: None,
2368 });
2369
2370 self.stats.coordination_rounds += 1;
2371 trace!(
2372 "Started coordination round {} with {} targets",
2373 round,
2374 self.coordination
2375 .as_ref()
2376 .map(|c| c.punch_targets.len())
2377 .unwrap_or(0)
2378 );
2379 Ok(round)
2380 }
2381
2382 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2384 if targets.len() > 20 {
2386 return true;
2387 }
2388 let mut seen_addresses = std::collections::HashSet::new();
2390 for target in targets {
2391 if !seen_addresses.insert(target.remote_addr) {
2392 return true; }
2394 }
2395
2396 if targets.len() > 5 {
2398 let mut ipv4_addresses: Vec<_> = targets
2400 .iter()
2401 .filter_map(|t| match t.remote_addr.ip() {
2402 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2403 _ => None,
2404 })
2405 .collect();
2406
2407 if ipv4_addresses.len() >= 3 {
2408 ipv4_addresses.sort();
2409 let mut sequential_count = 1;
2410 for i in 1..ipv4_addresses.len() {
2411 if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2412 sequential_count += 1;
2413 if sequential_count >= 3 {
2414 return true; }
2416 } else {
2417 sequential_count = 1;
2418 }
2419 }
2420 }
2421 }
2422
2423 false
2424 }
2425
2426 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2428 self.coordination.as_ref().map(|c| c.state)
2429 }
2430 pub(super) fn should_send_punch_request(&self) -> bool {
2432 if let Some(coord) = &self.coordination {
2433 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2434 } else {
2435 false
2436 }
2437 }
2438 pub(super) fn mark_punch_request_sent(&mut self) {
2440 if let Some(coord) = &mut self.coordination {
2441 coord.punch_request_sent = true;
2442 coord.state = CoordinationPhase::Coordinating;
2443 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2444 }
2445 }
2446 pub(super) fn handle_peer_punch_request(
2448 &mut self,
2449 peer_round: VarInt,
2450 now: Instant,
2451 ) -> Result<bool, NatTraversalError> {
2452 if self.is_peer_coordination_suspicious(peer_round, now) {
2454 self.stats.suspicious_coordination_attempts += 1;
2455 self.stats.security_rejections += 1;
2456 debug!(
2457 "Suspicious peer coordination request rejected for round {}",
2458 peer_round
2459 );
2460 return Err(NatTraversalError::SuspiciousCoordination);
2461 }
2462 if let Some(coord) = &mut self.coordination {
2463 if coord.round == peer_round {
2464 match coord.state {
2465 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2466 coord.peer_punch_received = true;
2467 coord.state = CoordinationPhase::Preparing;
2468
2469 let network_rtt = self
2471 .network_monitor
2472 .get_estimated_rtt()
2473 .unwrap_or(Duration::from_millis(100));
2474 let quality_score = self.network_monitor.get_quality_score();
2475
2476 let base_grace = Duration::from_millis(150);
2478 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2479 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2480
2481 let adaptive_grace = Duration::from_millis(
2482 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2483 );
2484
2485 coord.punch_start = now + adaptive_grace;
2486
2487 trace!(
2488 "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2489 adaptive_grace, network_rtt, quality_score
2490 );
2491 Ok(true)
2492 }
2493 CoordinationPhase::Preparing => {
2494 trace!("Peer coordination confirmed during preparation");
2496 Ok(true)
2497 }
2498 _ => {
2499 debug!(
2500 "Received coordination in unexpected phase: {:?}",
2501 coord.state
2502 );
2503 Ok(false)
2504 }
2505 }
2506 } else {
2507 debug!(
2508 "Received coordination for wrong round: {} vs {}",
2509 peer_round, coord.round
2510 );
2511 Ok(false)
2512 }
2513 } else {
2514 debug!("Received peer coordination but no active round");
2515 Ok(false)
2516 }
2517 }
2518
2519 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2521 if peer_round.into_inner() == 0 {
2523 return true; }
2525 if let Some(coord) = &self.coordination {
2527 let our_round = coord.round.into_inner();
2528 let peer_round_num = peer_round.into_inner();
2529
2530 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2532 return true;
2533 }
2534 }
2535
2536 false
2537 }
2538
2539 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2541 if let Some(coord) = &self.coordination {
2542 match coord.state {
2543 CoordinationPhase::Preparing => now >= coord.punch_start,
2544 CoordinationPhase::Coordinating => {
2545 coord.peer_punch_received && now >= coord.punch_start
2547 }
2548 _ => false,
2549 }
2550 } else {
2551 false
2552 }
2553 }
2554 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2556 if let Some(coord) = &mut self.coordination {
2557 coord.state = CoordinationPhase::Punching;
2558 let network_rtt = self
2560 .network_monitor
2561 .get_estimated_rtt()
2562 .unwrap_or(Duration::from_millis(100));
2563
2564 let jitter_ms: u64 = rand::random::<u64>() % 11;
2566 let jitter = Duration::from_millis(jitter_ms);
2567 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2568
2569 coord.punch_start = transmission_time.max(now);
2571
2572 trace!(
2573 "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2574 coord.punch_start, network_rtt, jitter
2575 );
2576 }
2577 }
2578
2579 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2581 self.coordination
2582 .as_ref()
2583 .map(|c| c.punch_targets.as_slice())
2584 }
2585 pub(super) fn mark_coordination_validating(&mut self) {
2587 if let Some(coord) = &mut self.coordination {
2588 if coord.state == CoordinationPhase::Punching {
2589 coord.state = CoordinationPhase::Validating;
2590 trace!("Coordination moved to validation phase");
2591 }
2592 }
2593 }
2594 pub(super) fn handle_coordination_success(
2596 &mut self,
2597 remote_addr: SocketAddr,
2598 now: Instant,
2599 ) -> bool {
2600 if let Some(coord) = &mut self.coordination {
2601 let was_target = coord
2603 .punch_targets
2604 .iter()
2605 .any(|target| target.remote_addr == remote_addr);
2606 if was_target && coord.state == CoordinationPhase::Validating {
2607 let rtt = now.duration_since(coord.round_start);
2609 coord.timeout_state.update_success(rtt);
2610 self.network_monitor.record_success(rtt, now);
2611
2612 coord.state = CoordinationPhase::Succeeded;
2613 self.stats.direct_connections += 1;
2614 trace!(
2615 "Coordination succeeded via {} with RTT {:?}",
2616 remote_addr, rtt
2617 );
2618 true
2619 } else {
2620 false
2621 }
2622 } else {
2623 false
2624 }
2625 }
2626
2627 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2629 if let Some(coord) = &mut self.coordination {
2630 coord.retry_count += 1;
2631 coord.timeout_state.update_timeout();
2632 self.network_monitor.record_timeout(now);
2633 if coord.timeout_state.should_retry(coord.max_retries)
2635 && self.network_monitor.is_suitable_for_coordination()
2636 {
2637 coord.state = CoordinationPhase::Requesting;
2639 coord.punch_request_sent = false;
2640 coord.peer_punch_received = false;
2641 coord.round_start = now;
2642 coord.last_retry_at = Some(now);
2643
2644 let retry_delay = coord.timeout_state.get_retry_delay();
2646
2647 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2649 let adjusted_delay = Duration::from_millis(
2650 (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2651 );
2652
2653 coord.punch_start = now + adjusted_delay;
2654
2655 trace!(
2656 "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2657 coord.round,
2658 coord.retry_count + 1,
2659 adjusted_delay,
2660 self.network_monitor.get_quality_score()
2661 );
2662 true
2663 } else {
2664 coord.state = CoordinationPhase::Failed;
2665 self.stats.coordination_failures += 1;
2666
2667 if !self.network_monitor.is_suitable_for_coordination() {
2668 trace!(
2669 "Coordination failed due to poor network conditions (quality: {:.2})",
2670 self.network_monitor.get_quality_score()
2671 );
2672 } else {
2673 trace!("Coordination failed after {} attempts", coord.retry_count);
2674 }
2675 false
2676 }
2677 } else {
2678 false
2679 }
2680 }
2681
2682 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2684 if let Some(coord) = &mut self.coordination {
2685 let timeout = coord.timeout_state.get_timeout();
2686 let elapsed = now.duration_since(coord.round_start);
2687 if elapsed > timeout {
2688 trace!(
2689 "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2690 coord.round, elapsed, timeout
2691 );
2692 self.handle_coordination_failure(now);
2693 true
2694 } else {
2695 false
2696 }
2697 } else {
2698 false
2699 }
2700 }
2701
2702 pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2704 let mut expired_validations = Vec::new();
2705 let mut retry_validations = Vec::new();
2706
2707 for (addr, validation) in &mut self.active_validations {
2708 let timeout = validation.timeout_state.get_timeout();
2709 let elapsed = now.duration_since(validation.sent_at);
2710
2711 if elapsed >= timeout {
2712 if validation
2713 .timeout_state
2714 .should_retry(validation.max_retries)
2715 {
2716 retry_validations.push(*addr);
2718 } else {
2719 expired_validations.push(*addr);
2721 }
2722 }
2723 }
2724
2725 for addr in retry_validations {
2727 if let Some(validation) = self.active_validations.get_mut(&addr) {
2728 validation.retry_count += 1;
2729 validation.sent_at = now;
2730 validation.last_retry_at = Some(now);
2731 validation.timeout_state.update_timeout();
2732
2733 trace!(
2734 "Retrying validation for {} (attempt {})",
2735 addr,
2736 validation.retry_count + 1
2737 );
2738 }
2739 }
2740
2741 for addr in &expired_validations {
2743 self.active_validations.remove(addr);
2744 self.network_monitor.record_timeout(now);
2745 trace!("Validation expired for {}", addr);
2746 }
2747
2748 expired_validations
2749 }
2750
2751 pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2753 let mut retry_addresses = Vec::new();
2754
2755 for (addr, validation) in &mut self.active_validations {
2757 let elapsed = now.duration_since(validation.sent_at);
2758 let timeout = validation.timeout_state.get_timeout();
2759
2760 if elapsed > timeout
2761 && validation
2762 .timeout_state
2763 .should_retry(validation.max_retries)
2764 {
2765 validation.retry_count += 1;
2767 validation.last_retry_at = Some(now);
2768 validation.sent_at = now; validation.timeout_state.update_timeout();
2770
2771 retry_addresses.push(*addr);
2772 trace!(
2773 "Scheduled retry {} for validation to {}",
2774 validation.retry_count, addr
2775 );
2776 }
2777 }
2778
2779 retry_addresses
2780 }
2781
2782 pub(super) fn update_network_conditions(&mut self, now: Instant) {
2784 self.network_monitor.cleanup(now);
2785
2786 let multiplier = self.network_monitor.get_timeout_multiplier();
2788
2789 for validation in self.active_validations.values_mut() {
2791 if multiplier > 1.5 {
2792 validation.timeout_state.backoff_multiplier =
2794 (validation.timeout_state.backoff_multiplier * 1.2)
2795 .min(validation.timeout_state.max_backoff_multiplier);
2796 } else if multiplier < 0.8 {
2797 validation.timeout_state.backoff_multiplier =
2799 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2800 }
2801 }
2802 }
2803
2804 pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2806 if let Some(coord) = &self.coordination {
2807 if coord.retry_count > 0 {
2808 if let Some(last_retry) = coord.last_retry_at {
2809 let retry_delay = coord.timeout_state.get_retry_delay();
2810 return now.duration_since(last_retry) >= retry_delay;
2811 }
2812 }
2813 }
2814 false
2815 }
2816
2817 pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2819 self.resource_manager.update_stats(
2821 self.active_validations.len(),
2822 self.local_candidates.len(),
2823 self.remote_candidates.len(),
2824 self.candidate_pairs.len(),
2825 );
2826
2827 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2829 self.active_validations.len(),
2830 self.local_candidates.len(),
2831 self.remote_candidates.len(),
2832 self.candidate_pairs.len(),
2833 );
2834
2835 let mut cleaned = 0;
2837
2838 if self.resource_manager.should_cleanup(now) {
2839 cleaned += self.resource_manager.cleanup_expired_resources(
2840 &mut self.active_validations,
2841 &mut self.local_candidates,
2842 &mut self.remote_candidates,
2843 &mut self.candidate_pairs,
2844 &mut self.coordination,
2845 now,
2846 );
2847
2848 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2850 cleaned += self.resource_manager.aggressive_cleanup(
2851 &mut self.active_validations,
2852 &mut self.local_candidates,
2853 &mut self.remote_candidates,
2854 &mut self.candidate_pairs,
2855 now,
2856 );
2857 }
2858 }
2859
2860 cleaned
2861 }
2862
2863 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2865 self.resource_manager.update_stats(
2867 self.active_validations.len(),
2868 self.local_candidates.len(),
2869 self.remote_candidates.len(),
2870 self.candidate_pairs.len(),
2871 );
2872 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2873 self.active_validations.len(),
2874 self.local_candidates.len(),
2875 self.remote_candidates.len(),
2876 self.candidate_pairs.len(),
2877 );
2878 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2880 self.resource_manager.stats.allocation_failures += 1;
2881 return true;
2882 }
2883
2884 if self.resource_manager.check_resource_limits(self) {
2886 self.resource_manager.stats.allocation_failures += 1;
2887 return true;
2888 }
2889
2890 false
2891 }
2892
2893 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2895 let mut next_timeout = None;
2896 if let Some(coord) = &self.coordination {
2898 match coord.state {
2899 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2900 let timeout_at = coord.round_start + self.coordination_timeout;
2901 next_timeout =
2902 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2903 }
2904 CoordinationPhase::Preparing => {
2905 next_timeout = Some(
2907 next_timeout
2908 .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
2909 );
2910 }
2911 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2912 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2914 next_timeout =
2915 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2916 }
2917 _ => {}
2918 }
2919 }
2920
2921 for validation in self.active_validations.values() {
2923 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2924 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2925 }
2926
2927 if self.resource_manager.should_cleanup(now) {
2929 let cleanup_at = now + Duration::from_secs(1);
2931 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2932 }
2933
2934 next_timeout
2935 }
2936
2937 pub(super) fn handle_timeout(
2939 &mut self,
2940 now: Instant,
2941 ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2942 let mut actions = Vec::new();
2943 if let Some(coord) = &mut self.coordination {
2945 match coord.state {
2946 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2947 let timeout_at = coord.round_start + self.coordination_timeout;
2948 if now >= timeout_at {
2949 coord.retry_count += 1;
2950 if coord.retry_count >= coord.max_retries {
2951 debug!("Coordination failed after {} retries", coord.retry_count);
2952 coord.state = CoordinationPhase::Failed;
2953 actions.push(TimeoutAction::Failed);
2954 } else {
2955 debug!(
2956 "Coordination timeout, retrying ({}/{})",
2957 coord.retry_count, coord.max_retries
2958 );
2959 coord.state = CoordinationPhase::Requesting;
2960 coord.round_start = now;
2961 actions.push(TimeoutAction::RetryCoordination);
2962 }
2963 }
2964 }
2965 CoordinationPhase::Preparing => {
2966 if now >= coord.punch_start {
2968 debug!("Starting coordinated hole punching");
2969 coord.state = CoordinationPhase::Punching;
2970 actions.push(TimeoutAction::StartValidation);
2971 }
2972 }
2973 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2974 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2975 if now >= timeout_at {
2976 coord.retry_count += 1;
2977 if coord.retry_count >= coord.max_retries {
2978 debug!("Validation failed after {} retries", coord.retry_count);
2979 coord.state = CoordinationPhase::Failed;
2980 actions.push(TimeoutAction::Failed);
2981 } else {
2982 debug!(
2983 "Validation timeout, retrying ({}/{})",
2984 coord.retry_count, coord.max_retries
2985 );
2986 coord.state = CoordinationPhase::Punching;
2987 actions.push(TimeoutAction::StartValidation);
2988 }
2989 }
2990 }
2991 CoordinationPhase::Succeeded => {
2992 actions.push(TimeoutAction::Complete);
2993 }
2994 CoordinationPhase::Failed => {
2995 actions.push(TimeoutAction::Failed);
2996 }
2997 _ => {}
2998 }
2999 }
3000
3001 let mut expired_validations = Vec::new();
3003 for (addr, validation) in &mut self.active_validations {
3004 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
3005 if now >= timeout_at {
3006 validation.retry_count += 1;
3007 if validation.retry_count >= validation.max_retries {
3008 debug!("Path validation failed for {}: max retries exceeded", addr);
3009 expired_validations.push(*addr);
3010 } else {
3011 debug!(
3012 "Path validation timeout for {}, retrying ({}/{})",
3013 addr, validation.retry_count, validation.max_retries
3014 );
3015 validation.sent_at = now;
3016 validation.last_retry_at = Some(now);
3017 actions.push(TimeoutAction::StartValidation);
3018 }
3019 }
3020 }
3021
3022 for addr in expired_validations {
3024 self.active_validations.remove(&addr);
3025 }
3026
3027 if self.resource_manager.should_cleanup(now) {
3029 self.resource_manager.perform_cleanup(now);
3030 }
3031
3032 self.network_monitor.update_quality_score(now);
3034
3035 if self.coordination.is_none()
3037 && !self.local_candidates.is_empty()
3038 && !self.remote_candidates.is_empty()
3039 {
3040 actions.push(TimeoutAction::RetryDiscovery);
3041 }
3042
3043 Ok(actions)
3044 }
3045
3046 pub(super) fn handle_address_observation(
3051 &mut self,
3052 peer_id: [u8; 32],
3053 observed_address: SocketAddr,
3054 connection_id: crate::shared::ConnectionId,
3055 peer_role: NatTraversalRole,
3056 now: Instant,
3057 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3058 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3059 let connection_context = ConnectionContext {
3060 connection_id,
3061 original_destination: observed_address, peer_role,
3063 };
3064
3065 bootstrap_coordinator.observe_peer_address(
3067 peer_id,
3068 observed_address,
3069 connection_context,
3070 now,
3071 )?;
3072
3073 let sequence = self.next_sequence;
3075 self.next_sequence =
3076 VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3077
3078 let priority = VarInt::from_u32(100); let add_address_frame =
3080 bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3081
3082 Ok(add_address_frame)
3083 } else {
3084 Ok(None)
3086 }
3087 }
3088
3089 pub(super) fn handle_punch_me_now_frame(
3094 &mut self,
3095 from_peer: [u8; 32],
3096 source_addr: SocketAddr,
3097 frame: &crate::frame::PunchMeNow,
3098 now: Instant,
3099 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3100 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3101 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3102 } else {
3103 Ok(None)
3105 }
3106 }
3107 pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3111 self.bootstrap_coordinator
3112 .as_ref()
3113 .and_then(|coord| coord.peer_index.get(&peer_id).map(|p| p.observed_addr))
3114 }
3115
3116 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3118 debug!("Starting candidate discovery for NAT traversal");
3119 if self.local_candidates.is_empty() {
3121 debug!("Local candidates will be populated by discovery manager");
3124 }
3125
3126 Ok(())
3127 }
3128
3129 pub(super) fn queue_add_address_frame(
3131 &mut self,
3132 sequence: VarInt,
3133 address: SocketAddr,
3134 priority: u32,
3135 ) -> Result<(), NatTraversalError> {
3136 debug!(
3137 "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3138 sequence, address, priority
3139 );
3140
3141 let candidate = AddressCandidate {
3143 address,
3144 priority,
3145 source: CandidateSource::Local,
3146 discovered_at: Instant::now(),
3147 state: CandidateState::New,
3148 attempt_count: 0,
3149 last_attempt: None,
3150 };
3151
3152 if !self.local_candidates.values().any(|c| c.address == address) {
3154 self.local_candidates.insert(sequence, candidate);
3155 }
3156
3157 Ok(())
3158 }
3159}
3160
3161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3163#[allow(dead_code)]
3164pub(crate) enum NatTraversalError {
3165 TooManyCandidates,
3167 DuplicateAddress,
3169 UnknownCandidate,
3171 InvalidCandidateState,
3173 NoActiveValidation,
3175 ChallengeMismatch,
3177 NoActiveCoordination,
3179 SecurityValidationFailed,
3181 RateLimitExceeded,
3183 InvalidAddress,
3185 SuspiciousCoordination,
3187 ResourceLimitExceeded,
3189}
3190impl std::fmt::Display for NatTraversalError {
3191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3192 match self {
3193 Self::TooManyCandidates => write!(f, "too many candidates"),
3194 Self::DuplicateAddress => write!(f, "duplicate address"),
3195 Self::UnknownCandidate => write!(f, "unknown candidate"),
3196 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3197 Self::NoActiveValidation => write!(f, "no active validation"),
3198 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3199 Self::NoActiveCoordination => write!(f, "no active coordination"),
3200 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3201 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3202 Self::InvalidAddress => write!(f, "invalid address"),
3203 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3204 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3205 }
3206 }
3207}
3208
3209impl std::error::Error for NatTraversalError {}
3210
3211#[derive(Debug, Clone)]
3213#[allow(dead_code)]
3214pub(crate) struct SecurityStats {
3215 pub total_security_rejections: u32,
3217 pub rate_limit_violations: u32,
3219 pub invalid_address_rejections: u32,
3221 pub suspicious_coordination_attempts: u32,
3223 pub active_validations: usize,
3225 pub cached_address_validations: usize,
3227 pub current_candidate_rate: usize,
3229 pub current_coordination_rate: usize,
3231}
3232#[derive(Debug)]
3237pub(crate) struct BootstrapCoordinator {
3238 address_observations: HashMap<SocketAddr, AddressObservation>,
3240 peer_index: HashMap<PeerId, ObservedPeer>,
3242 coordination_table: HashMap<VarInt, CoordinationEntry>,
3244 security_validator: SecurityValidationState,
3246 stats: BootstrapStats,
3248}
3249type PeerId = [u8; 32];
3252#[derive(Debug, Clone)]
3254struct ObservedPeer {
3255 observed_addr: SocketAddr,
3256}
3257
3258#[derive(Debug, Clone)]
3260struct CoordinationEntry {
3261 peer_b: Option<PeerId>,
3262 address_hint: SocketAddr,
3263}
3264#[derive(Debug, Clone)]
3266#[allow(dead_code)]
3267pub(crate) struct PeerObservationRecord {
3268 peer_id: PeerId,
3270 observed_address: SocketAddr,
3272 observed_at: Instant,
3274 connection_context: ConnectionContext,
3276 can_coordinate: bool,
3278 coordination_count: u32,
3280 success_rate: f64,
3282}
3283
3284#[derive(Debug, Clone)]
3286#[allow(dead_code)]
3287pub(crate) struct ConnectionContext {
3288 connection_id: ConnectionId,
3290 original_destination: SocketAddr,
3292 peer_role: NatTraversalRole,
3294 }
3296
3297#[derive(Debug, Clone)]
3301#[allow(dead_code)]
3302struct AddressObservation {
3303 address: SocketAddr,
3305 first_observed: Instant,
3307 observation_count: u32,
3309 validation_state: AddressValidationResult,
3311 associated_peers: Vec<PeerId>,
3313}
3314
3315#[derive(Debug, Clone, Default)]
3319pub(crate) struct BootstrapConfig {
3320 _unused: (),
3321}
3322#[derive(Debug, Clone, Default)]
3324pub(crate) struct BootstrapStats {
3325 total_observations: u64,
3327 total_coordinations: u64,
3329 successful_coordinations: u64,
3331 security_rejections: u64,
3333}
3334impl BootstrapCoordinator {
3336 pub(crate) fn new(_config: BootstrapConfig) -> Self {
3338 Self {
3339 address_observations: HashMap::new(),
3340 peer_index: HashMap::new(),
3341 coordination_table: HashMap::new(),
3342 security_validator: SecurityValidationState::new(),
3343 stats: BootstrapStats::default(),
3344 }
3345 }
3346 pub(crate) fn observe_peer_address(
3351 &mut self,
3352 peer_id: PeerId,
3353 observed_address: SocketAddr,
3354 _connection_context: ConnectionContext,
3355 now: Instant,
3356 ) -> Result<(), NatTraversalError> {
3357 match self
3359 .security_validator
3360 .validate_address(observed_address, now)
3361 {
3362 AddressValidationResult::Valid => {}
3363 AddressValidationResult::Invalid => {
3364 self.stats.security_rejections += 1;
3365 return Err(NatTraversalError::InvalidAddress);
3366 }
3367 AddressValidationResult::Suspicious => {
3368 self.stats.security_rejections += 1;
3369 return Err(NatTraversalError::SecurityValidationFailed);
3370 }
3371 }
3372
3373 if self.security_validator.is_candidate_rate_limited(now) {
3375 self.stats.security_rejections += 1;
3376 return Err(NatTraversalError::RateLimitExceeded);
3377 }
3378
3379 let observation = self
3381 .address_observations
3382 .entry(observed_address)
3383 .or_insert_with(|| AddressObservation {
3384 address: observed_address,
3385 first_observed: now,
3386 observation_count: 0,
3387 validation_state: AddressValidationResult::Valid,
3388 associated_peers: Vec::new(),
3389 });
3390
3391 observation.observation_count += 1;
3392 if !observation.associated_peers.contains(&peer_id) {
3393 observation.associated_peers.push(peer_id);
3394 }
3395
3396 self.peer_index.insert(
3398 peer_id,
3399 ObservedPeer {
3400 observed_addr: observed_address,
3401 },
3402 );
3403
3404 self.stats.total_observations += 1;
3406 debug!(
3409 "Observed peer {:?} at address {} (total observations: {})",
3410 peer_id, observed_address, self.stats.total_observations
3411 );
3412
3413 Ok(())
3414 }
3415
3416 pub(crate) fn generate_add_address_frame(
3421 &self,
3422 peer_id: PeerId,
3423 sequence: VarInt,
3424 priority: VarInt,
3425 ) -> Option<crate::frame::AddAddress> {
3426 let addr = self.peer_index.get(&peer_id)?.observed_addr;
3427 Some(crate::frame::AddAddress {
3428 sequence,
3429 address: addr,
3430 priority,
3431 })
3432 }
3433
3434 pub(crate) fn process_punch_me_now_frame(
3439 &mut self,
3440 from_peer: PeerId,
3441 source_addr: SocketAddr,
3442 frame: &crate::frame::PunchMeNow,
3443 now: Instant,
3444 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3445 if self
3447 .security_validator
3448 .is_adaptive_rate_limited(from_peer, now)
3449 {
3450 self.stats.security_rejections += 1;
3451 debug!(
3452 "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3453 hex::encode(&from_peer[..8])
3454 );
3455 return Err(NatTraversalError::RateLimitExceeded);
3456 }
3457 self.security_validator
3459 .enhanced_address_validation(frame.address, source_addr, now)
3460 .inspect_err(|&e| {
3461 self.stats.security_rejections += 1;
3462 debug!(
3463 "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3464 hex::encode(&from_peer[..8]),
3465 e
3466 );
3467 })?;
3468
3469 self.security_validator
3471 .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3472 .inspect_err(|&e| {
3473 self.stats.security_rejections += 1;
3474 debug!(
3475 "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3476 hex::encode(&from_peer[..8]),
3477 e
3478 );
3479 })?;
3480
3481 let _entry = self
3483 .coordination_table
3484 .entry(frame.round)
3485 .or_insert(CoordinationEntry {
3486 peer_b: frame.target_peer_id,
3487 address_hint: frame.address,
3488 });
3489 if let Some(peer_b) = frame.target_peer_id {
3491 if _entry.peer_b.is_none() {
3492 _entry.peer_b = Some(peer_b);
3493 }
3494 _entry.address_hint = frame.address;
3495 }
3496
3497 if let Some(_target_peer_id) = frame.target_peer_id {
3499 let coordination_frame = crate::frame::PunchMeNow {
3500 round: frame.round,
3501 paired_with_sequence_number: frame.paired_with_sequence_number,
3502 address: frame.address,
3503 target_peer_id: Some(from_peer),
3504 };
3505 self.stats.total_coordinations += 1;
3506 Ok(Some(coordination_frame))
3507 } else {
3508 self.stats.successful_coordinations += 1;
3510 Ok(None)
3511 }
3512 }
3513
3514 #[allow(dead_code)]
3520 pub(crate) fn cleanup_expired_sessions(&mut self, _now: Instant) {}
3521
3522 #[allow(dead_code)]
3527 pub(crate) fn poll_session_state_machine(&mut self, _now: Instant) -> Vec<()> {
3528 Vec::new()
3530 }
3531
3532 #[allow(dead_code)]
3536 fn cleanup_completed_sessions(&mut self, _now: Instant) {}
3537
3538 #[allow(dead_code)]
3543 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
3544 let _ = peer_id;
3547 None
3548 }
3549 #[allow(dead_code)]
3570 pub(crate) fn get_peer_record(&self, _peer_id: PeerId) -> Option<&PeerObservationRecord> {
3571 None
3573 }
3574}
3575
3576#[cfg(test)]
3590mod tests {
3591 use super::*;
3592
3593 fn create_test_state(role: NatTraversalRole) -> NatTraversalState {
3594 NatTraversalState::new(
3595 role,
3596 10, Duration::from_secs(30), )
3599 }
3600
3601 #[test]
3602 fn test_add_quic_discovered_address() {
3603 let mut state = create_test_state(NatTraversalRole::Client);
3605 let now = Instant::now();
3606
3607 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
3609 let seq = state.add_local_candidate(
3610 discovered_addr,
3611 CandidateSource::Observed { by_node: None },
3612 now,
3613 );
3614
3615 assert_eq!(state.local_candidates.len(), 1);
3617 let candidate = state.local_candidates.get(&seq).unwrap();
3618 assert_eq!(candidate.address, discovered_addr);
3619 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3620 assert_eq!(candidate.state, CandidateState::New);
3621
3622 assert!(candidate.priority > 0);
3624 }
3625
3626 #[test]
3627 fn test_add_multiple_quic_discovered_addresses() {
3628 let mut state = create_test_state(NatTraversalRole::Client);
3630 let now = Instant::now();
3631
3632 let addrs = vec![
3633 SocketAddr::from(([1, 2, 3, 4], 5678)),
3634 SocketAddr::from(([5, 6, 7, 8], 9012)),
3635 SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
3636 ];
3637
3638 let mut sequences = Vec::new();
3639 for addr in &addrs {
3640 let seq =
3641 state.add_local_candidate(*addr, CandidateSource::Observed { by_node: None }, now);
3642 sequences.push(seq);
3643 }
3644
3645 assert_eq!(state.local_candidates.len(), 3);
3647
3648 for (seq, addr) in sequences.iter().zip(&addrs) {
3650 let candidate = state.local_candidates.get(seq).unwrap();
3651 assert_eq!(candidate.address, *addr);
3652 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3653 }
3654 }
3655
3656 #[test]
3657 fn test_quic_discovered_addresses_in_local_candidates() {
3658 let mut state = create_test_state(NatTraversalRole::Client);
3660 let now = Instant::now();
3661
3662 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3664 let seq = state.add_local_candidate(addr, CandidateSource::Observed { by_node: None }, now);
3665
3666 assert!(state.local_candidates.contains_key(&seq));
3668 let candidate = state.local_candidates.get(&seq).unwrap();
3669 assert_eq!(candidate.address, addr);
3670
3671 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3673 }
3674
3675 #[test]
3676 fn test_quic_discovered_addresses_included_in_hole_punching() {
3677 let mut state = create_test_state(NatTraversalRole::Client);
3679 let now = Instant::now();
3680
3681 let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3683 state.add_local_candidate(local_addr, CandidateSource::Observed { by_node: None }, now);
3684
3685 let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
3687 let priority = VarInt::from_u32(100);
3688 state
3689 .add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
3690 .expect("add remote candidate should succeed");
3691
3692 state.generate_candidate_pairs(now);
3694
3695 assert_eq!(state.candidate_pairs.len(), 1);
3697 let pair = &state.candidate_pairs[0];
3698 assert_eq!(pair.local_addr, local_addr);
3699 assert_eq!(pair.remote_addr, remote_addr);
3700 }
3701
3702 #[test]
3703 fn test_prioritize_quic_discovered_over_predicted() {
3704 let mut state = create_test_state(NatTraversalRole::Client);
3706 let now = Instant::now();
3707
3708 let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
3710 let predicted_seq =
3711 state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
3712
3713 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
3715 let discovered_seq = state.add_local_candidate(
3716 discovered_addr,
3717 CandidateSource::Observed { by_node: None },
3718 now,
3719 );
3720
3721 let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
3723 let discovered_priority = state
3724 .local_candidates
3725 .get(&discovered_seq)
3726 .unwrap()
3727 .priority;
3728
3729 assert!(discovered_priority >= predicted_priority);
3732 }
3733
3734 #[test]
3735 fn test_integration_with_nat_traversal_flow() {
3736 let mut state = create_test_state(NatTraversalRole::Client);
3738 let now = Instant::now();
3739
3740 let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
3742 state.add_local_candidate(local_addr, CandidateSource::Local, now);
3743
3744 let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
3745 state.add_local_candidate(
3746 discovered_addr,
3747 CandidateSource::Observed { by_node: None },
3748 now,
3749 );
3750
3751 let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
3753 let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
3754 let priority = VarInt::from_u32(100);
3755 state
3756 .add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
3757 .expect("add remote candidate should succeed");
3758 state
3759 .add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
3760 .expect("add remote candidate should succeed");
3761
3762 state.generate_candidate_pairs(now);
3764
3765 assert_eq!(state.candidate_pairs.len(), 4);
3767
3768 let discovered_pairs: Vec<_> = state
3770 .candidate_pairs
3771 .iter()
3772 .filter(|p| p.local_addr == discovered_addr)
3773 .collect();
3774 assert_eq!(discovered_pairs.len(), 2);
3775 }
3776}