1use std::{
9 collections::{HashMap, VecDeque},
10 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
11 time::Duration,
12};
13
14use crate::shared::ConnectionId;
15use tracing::{debug, info, trace, warn};
16
17use crate::{Instant, VarInt};
18
19#[derive(Debug)]
27pub(super) struct NatTraversalState {
28 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
31 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
33 pub(super) candidate_pairs: Vec<CandidatePair>,
35 pub(super) pair_index: HashMap<SocketAddr, usize>,
37 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
39 pub(super) coordination: Option<CoordinationState>,
41 pub(super) next_sequence: VarInt,
43 pub(super) max_candidates: u32,
45 pub(super) coordination_timeout: Duration,
47 pub(super) stats: NatTraversalStats,
49 pub(super) security_state: SecurityValidationState,
51 pub(super) network_monitor: NetworkConditionMonitor,
53 pub(super) resource_manager: ResourceCleanupCoordinator,
55 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
57}
58#[derive(Debug, Clone)]
62pub(super) struct AddressCandidate {
63 pub(super) address: SocketAddr,
65 pub(super) priority: u32,
67 pub(super) source: CandidateSource,
69 pub(super) discovered_at: Instant,
71 pub(super) state: CandidateState,
73 pub(super) attempt_count: u32,
75 pub(super) last_attempt: Option<Instant>,
77}
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum CandidateSource {
81 Local,
83 Observed {
88 by_node: Option<VarInt>,
90 },
91 Peer,
93 Predicted,
95}
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum CandidateState {
99 New,
101 Validating,
103 Valid,
105 Failed,
107 Removed,
109}
110#[derive(Debug)]
112#[allow(dead_code)]
113pub(super) struct PathValidationState {
114 pub(super) challenge: u64,
116 pub(super) sent_at: Instant,
118 pub(super) retry_count: u32,
120 pub(super) max_retries: u32,
122 pub(super) coordination_round: Option<VarInt>,
124 pub(super) timeout_state: AdaptiveTimeoutState,
126 pub(super) last_retry_at: Option<Instant>,
128}
129#[derive(Debug)]
131#[allow(dead_code)]
132pub(super) struct CoordinationState {
133 pub(super) round: VarInt,
135 pub(super) punch_targets: Vec<PunchTarget>,
137 pub(super) round_start: Instant,
139 pub(super) punch_start: Instant,
141 pub(super) round_duration: Duration,
143 pub(super) state: CoordinationPhase,
145 pub(super) punch_request_sent: bool,
147 pub(super) peer_punch_received: bool,
149 pub(super) retry_count: u32,
151 pub(super) max_retries: u32,
153 pub(super) timeout_state: AdaptiveTimeoutState,
155 pub(super) last_retry_at: Option<Instant>,
157}
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160#[allow(dead_code)]
161pub(crate) enum CoordinationPhase {
162 Idle,
164 Requesting,
166 Coordinating,
168 Preparing,
170 Punching,
172 Validating,
174 Succeeded,
176 Failed,
178}
179#[derive(Debug, Clone)]
181pub(super) struct PunchTarget {
182 pub(super) remote_addr: SocketAddr,
184 pub(super) remote_sequence: VarInt,
186 pub(super) challenge: u64,
188}
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub(super) enum TimeoutAction {
192 RetryDiscovery,
194 RetryCoordination,
196 StartValidation,
198 Complete,
200 Failed,
202}
203
204#[derive(Debug, Clone)]
206#[allow(dead_code)]
207pub(super) struct CandidatePair {
208 pub(super) remote_sequence: VarInt,
210 pub(super) local_addr: SocketAddr,
212 pub(super) remote_addr: SocketAddr,
214 pub(super) priority: u64,
216 pub(super) state: PairState,
218 pub(super) pair_type: PairType,
220 pub(super) created_at: Instant,
222 pub(super) last_check: Option<Instant>,
224}
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227#[allow(dead_code)]
228pub(super) enum PairState {
229 Waiting,
231 Succeeded,
233 Failed,
235 Frozen,
237}
238#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
240pub(super) enum PairType {
241 HostToHost,
243 HostToServerReflexive,
245 ServerReflexiveToHost,
247 ServerReflexiveToServerReflexive,
249 PeerReflexive,
251}
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub(super) enum CandidateType {
255 Host,
257 ServerReflexive,
259 PeerReflexive,
261}
262
263#[allow(dead_code)]
266fn calculate_candidate_priority(
267 candidate_type: CandidateType,
268 local_preference: u16,
269 component_id: u8,
270) -> u32 {
271 let type_preference = match candidate_type {
272 CandidateType::Host => 126,
273 CandidateType::PeerReflexive => 110,
274 CandidateType::ServerReflexive => 100,
275 };
276 (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
278}
279
280fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
283 let g = local_priority as u64;
284 let d = remote_priority as u64;
285 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
287}
288
289fn classify_candidate_type(source: CandidateSource) -> CandidateType {
291 match source {
292 CandidateSource::Local => CandidateType::Host,
293 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
294 CandidateSource::Peer => CandidateType::PeerReflexive,
295 CandidateSource::Predicted => CandidateType::ServerReflexive, }
297}
298fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
300 match (local_type, remote_type) {
301 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
302 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
303 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
304 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
305 PairType::ServerReflexiveToServerReflexive
306 }
307 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
308 PairType::PeerReflexive
309 }
310 }
311}
312fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
314 match (local.address, remote.address) {
316 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
317 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
318 _ => false, }
320}
321#[derive(Debug, Default, Clone)]
323#[allow(dead_code)]
324pub(crate) struct NatTraversalStats {
325 pub(super) remote_candidates_received: u32,
327 pub(super) local_candidates_sent: u32,
329 pub(super) validations_succeeded: u32,
331 pub(super) validations_failed: u32,
333 pub(super) coordination_rounds: u32,
335 pub(super) successful_coordinations: u32,
337 pub(super) failed_coordinations: u32,
339 pub(super) timed_out_coordinations: u32,
341 pub(super) coordination_failures: u32,
343 pub(super) direct_connections: u32,
345 pub(super) security_rejections: u32,
347 pub(super) rate_limit_violations: u32,
349 pub(super) invalid_address_rejections: u32,
351 pub(super) suspicious_coordination_attempts: u32,
353}
354#[derive(Debug)]
356#[allow(dead_code)]
357pub(super) struct SecurityValidationState {
358 candidate_rate_tracker: VecDeque<Instant>,
360 max_candidates_per_window: u32,
362 rate_window: Duration,
364 coordination_requests: VecDeque<CoordinationRequest>,
366 max_coordination_per_window: u32,
368 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
370 validation_cache_timeout: Duration,
372}
373#[derive(Debug, Clone)]
375struct CoordinationRequest {
376 timestamp: Instant,
378}
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381enum AddressValidationResult {
382 Valid,
384 Invalid,
386 Suspicious,
388}
389#[derive(Debug, Clone)]
391pub(super) struct AdaptiveTimeoutState {
392 current_timeout: Duration,
394 min_timeout: Duration,
396 max_timeout: Duration,
398 base_timeout: Duration,
400 backoff_multiplier: f64,
402 max_backoff_multiplier: f64,
404 jitter_factor: f64,
406 srtt: Option<Duration>,
408 rttvar: Option<Duration>,
410 last_rtt: Option<Duration>,
412 consecutive_timeouts: u32,
414 successful_responses: u32,
416}
417#[derive(Debug)]
419#[allow(dead_code)]
420pub(super) struct NetworkConditionMonitor {
421 rtt_samples: VecDeque<Duration>,
423 max_samples: usize,
425 packet_loss_rate: f64,
427 congestion_window: u32,
429 quality_score: f64,
431 last_quality_update: Instant,
433 quality_update_interval: Duration,
435 timeout_stats: TimeoutStatistics,
437}
438#[derive(Debug, Default)]
440struct TimeoutStatistics {
441 total_timeouts: u64,
443 total_responses: u64,
445 avg_response_time: Duration,
447 timeout_rate: f64,
449 last_update: Option<Instant>,
451}
452#[allow(dead_code)]
453impl SecurityValidationState {
454 fn new() -> Self {
456 Self {
457 candidate_rate_tracker: VecDeque::new(),
458 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
460 coordination_requests: VecDeque::new(),
461 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
463 validation_cache_timeout: Duration::from_secs(300), }
465 }
466 fn new_with_limits(
468 max_candidates_per_window: u32,
469 max_coordination_per_window: u32,
470 rate_window: Duration,
471 ) -> Self {
472 Self {
473 candidate_rate_tracker: VecDeque::new(),
474 max_candidates_per_window,
475 rate_window,
476 coordination_requests: VecDeque::new(),
477 max_coordination_per_window,
478 address_validation_cache: HashMap::new(),
479 validation_cache_timeout: Duration::from_secs(300),
480 }
481 }
482 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
487 self.cleanup_rate_tracker(now);
489 self.cleanup_coordination_tracker(now);
490 let _current_candidate_rate =
492 self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
493 let _current_coordination_rate =
494 self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
495
496 let peer_reputation = self.calculate_peer_reputation(peer_id);
498 let adaptive_candidate_limit =
499 (self.max_candidates_per_window as f64 * peer_reputation) as u32;
500 let adaptive_coordination_limit =
501 (self.max_coordination_per_window as f64 * peer_reputation) as u32;
502
503 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
505 debug!(
506 "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
507 hex::encode(&peer_id[..8]),
508 self.candidate_rate_tracker.len(),
509 adaptive_candidate_limit
510 );
511 return true;
512 }
513
514 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
515 debug!(
516 "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
517 hex::encode(&peer_id[..8]),
518 self.coordination_requests.len(),
519 adaptive_coordination_limit
520 );
521 return true;
522 }
523
524 false
525 }
526
527 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
532 1.0
541 }
542
543 fn validate_amplification_limits(
548 &mut self,
549 source_addr: SocketAddr,
550 target_addr: SocketAddr,
551 now: Instant,
552 ) -> Result<(), NatTraversalError> {
553 let amplification_key = (source_addr, target_addr);
555 if self.is_amplification_suspicious(amplification_key, now) {
564 warn!(
565 "Potential amplification attack detected: {} -> {}",
566 source_addr, target_addr
567 );
568 return Err(NatTraversalError::SuspiciousCoordination);
569 }
570
571 Ok(())
572 }
573
574 fn is_amplification_suspicious(
576 &self,
577 _amplification_key: (SocketAddr, SocketAddr),
578 _now: Instant,
579 ) -> bool {
580 false
589 }
590
591 fn generate_secure_coordination_round(&self) -> VarInt {
596 let secure_random: u64 = rand::random();
598 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
602 }
603
604 fn enhanced_address_validation(
612 &mut self,
613 addr: SocketAddr,
614 source_addr: SocketAddr,
615 now: Instant,
616 ) -> Result<AddressValidationResult, NatTraversalError> {
617 let basic_result = self.validate_address(addr, now);
619 match basic_result {
620 AddressValidationResult::Invalid => {
621 return Err(NatTraversalError::InvalidAddress);
622 }
623 AddressValidationResult::Suspicious => {
624 return Err(NatTraversalError::SuspiciousCoordination);
625 }
626 AddressValidationResult::Valid => {
627 }
629 }
630
631 self.validate_amplification_limits(source_addr, addr, now)?;
633
634 if self.is_address_in_suspicious_range(addr) {
636 warn!("Address in suspicious range detected: {}", addr);
637 return Err(NatTraversalError::SuspiciousCoordination);
638 }
639
640 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
641 warn!(
642 "Suspicious coordination pattern detected: {} -> {}",
643 source_addr, addr
644 );
645 return Err(NatTraversalError::SuspiciousCoordination);
646 }
647
648 Ok(AddressValidationResult::Valid)
649 }
650
651 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
653 match addr.ip() {
654 IpAddr::V4(ipv4) => {
655 let octets = ipv4.octets();
657 if octets[0] == 0 || octets[0] == 127 {
659 return true;
660 }
661
662 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
664 return true;
665 }
666 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
667 return true;
668 }
669 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
670 return true;
671 }
672
673 false
674 }
675 IpAddr::V6(ipv6) => {
676 if ipv6.is_loopback() || ipv6.is_unspecified() {
678 return true;
679 }
680
681 let segments = ipv6.segments();
683 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
684 return true;
685 }
686
687 false
688 }
689 }
690 }
691
692 fn is_coordination_pattern_suspicious(
694 &self,
695 _source_addr: SocketAddr,
696 _target_addr: SocketAddr,
697 _now: Instant,
698 ) -> bool {
699 false
708 }
709
710 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
712 self.cleanup_rate_tracker(now);
714 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
716 return true;
717 }
718
719 self.candidate_rate_tracker.push_back(now);
721 false
722 }
723
724 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
726 self.cleanup_coordination_tracker(now);
728 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
730 return true;
731 }
732
733 let request = CoordinationRequest { timestamp: now };
735 self.coordination_requests.push_back(request);
736 false
737 }
738
739 fn cleanup_rate_tracker(&mut self, now: Instant) {
741 let cutoff = now - self.rate_window;
742 while let Some(&front_time) = self.candidate_rate_tracker.front() {
743 if front_time < cutoff {
744 self.candidate_rate_tracker.pop_front();
745 } else {
746 break;
747 }
748 }
749 }
750 fn cleanup_coordination_tracker(&mut self, now: Instant) {
752 let cutoff = now - self.rate_window;
753 while let Some(front_request) = self.coordination_requests.front() {
754 if front_request.timestamp < cutoff {
755 self.coordination_requests.pop_front();
756 } else {
757 break;
758 }
759 }
760 }
761 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
763 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
765 return cached_result;
766 }
767 let result = self.perform_address_validation(addr);
768
769 self.address_validation_cache.insert(addr, result);
771
772 if self.address_validation_cache.len() > 1000 {
774 self.cleanup_address_cache(now);
775 }
776
777 result
778 }
779
780 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
782 match addr.ip() {
783 IpAddr::V4(ipv4) => {
784 if ipv4.is_unspecified() || ipv4.is_broadcast() {
786 return AddressValidationResult::Invalid;
787 }
788 if ipv4.is_multicast() || ipv4.is_documentation() {
790 return AddressValidationResult::Suspicious;
791 }
792
793 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
795 return AddressValidationResult::Invalid;
796 }
797
798 if self.is_suspicious_ipv4(ipv4) {
800 return AddressValidationResult::Suspicious;
801 }
802 }
803 IpAddr::V6(ipv6) => {
804 if ipv6.is_unspecified() || ipv6.is_multicast() {
806 return AddressValidationResult::Invalid;
807 }
808
809 if self.is_suspicious_ipv6(ipv6) {
811 return AddressValidationResult::Suspicious;
812 }
813 }
814 }
815
816 if addr.port() == 0 || addr.port() < 1024 {
818 return AddressValidationResult::Suspicious;
819 }
820
821 AddressValidationResult::Valid
822 }
823
824 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
826 let octets = ipv4.octets();
827 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
830 return true;
831 }
832
833 false
836 }
837
838 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
840 let segments = ipv6.segments();
841 if segments.iter().all(|&s| s == segments[0]) {
843 return true;
844 }
845
846 false
847 }
848
849 fn cleanup_address_cache(&mut self, _now: Instant) {
851 if self.address_validation_cache.len() > 500 {
854 let keys_to_remove: Vec<_> = self
855 .address_validation_cache
856 .keys()
857 .take(self.address_validation_cache.len() / 2)
858 .copied()
859 .collect();
860 for key in keys_to_remove {
861 self.address_validation_cache.remove(&key);
862 }
863 }
864 }
865
866 fn validate_punch_me_now_frame(
874 &mut self,
875 frame: &crate::frame::PunchMeNow,
876 source_addr: SocketAddr,
877 peer_id: [u8; 32],
878 now: Instant,
879 ) -> Result<(), NatTraversalError> {
880 if self.is_coordination_rate_limited(now) {
882 debug!(
883 "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
884 hex::encode(&peer_id[..8])
885 );
886 return Err(NatTraversalError::RateLimitExceeded);
887 }
888 let addr_validation = self.validate_address(frame.address, now);
890 match addr_validation {
891 AddressValidationResult::Invalid => {
892 debug!(
893 "PUNCH_ME_NOW frame rejected: invalid address {:?} from peer {:?}",
894 frame.address,
895 hex::encode(&peer_id[..8])
896 );
897 return Err(NatTraversalError::InvalidAddress);
898 }
899 AddressValidationResult::Suspicious => {
900 debug!(
901 "PUNCH_ME_NOW frame rejected: suspicious address {:?} from peer {:?}",
902 frame.address,
903 hex::encode(&peer_id[..8])
904 );
905 return Err(NatTraversalError::SuspiciousCoordination);
906 }
907 AddressValidationResult::Valid => {
908 }
910 }
911
912 if !self.validate_address_consistency(frame.address, source_addr) {
915 debug!(
916 "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
917 frame.address, source_addr
918 );
919 return Err(NatTraversalError::SuspiciousCoordination);
920 }
921
922 if !self.validate_coordination_parameters(frame) {
924 debug!(
925 "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
926 hex::encode(&peer_id[..8])
927 );
928 return Err(NatTraversalError::SuspiciousCoordination);
929 }
930
931 if let Some(target_peer_id) = frame.target_peer_id {
933 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
934 debug!(
935 "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
936 hex::encode(&peer_id[..8]),
937 hex::encode(&target_peer_id[..8])
938 );
939 return Err(NatTraversalError::SuspiciousCoordination);
940 }
941 }
942
943 if !self.validate_resource_limits(frame) {
945 debug!(
946 "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
947 hex::encode(&peer_id[..8])
948 );
949 return Err(NatTraversalError::ResourceLimitExceeded);
950 }
951
952 debug!(
953 "PUNCH_ME_NOW frame validation passed for peer {:?}",
954 hex::encode(&peer_id[..8])
955 );
956 Ok(())
957 }
958
959 fn validate_address_consistency(
964 &self,
965 claimed_addr: SocketAddr,
966 observed_addr: SocketAddr,
967 ) -> bool {
968 match (claimed_addr.ip(), observed_addr.ip()) {
972 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
973 if claimed_ip == observed_ip {
975 return true;
976 }
977
978 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
980 return true;
981 }
982
983 !claimed_ip.is_private() && !observed_ip.is_private()
986 }
987 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
988 claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
990 }
991 _ => {
992 false
994 }
995 }
996 }
997
998 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1000 let ip1_octets = ip1.octets();
1002 let ip2_octets = ip2.octets();
1003 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1005 return true;
1006 }
1007
1008 if ip1_octets[0] == 172
1010 && ip2_octets[0] == 172
1011 && (16..=31).contains(&ip1_octets[1])
1012 && (16..=31).contains(&ip2_octets[1])
1013 {
1014 return true;
1015 }
1016
1017 if ip1_octets[0] == 192
1019 && ip1_octets[1] == 168
1020 && ip2_octets[0] == 192
1021 && ip2_octets[1] == 168
1022 {
1023 return true;
1024 }
1025
1026 false
1027 }
1028
1029 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1031 let segments1 = ip1.segments();
1033 let segments2 = ip2.segments();
1034 segments1[0] == segments2[0]
1035 && segments1[1] == segments2[1]
1036 && segments1[2] == segments2[2]
1037 && segments1[3] == segments2[3]
1038 }
1039
1040 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1042 if frame.round.into_inner() > 1000000 {
1044 return false;
1045 }
1046 if frame.paired_with_sequence_number.into_inner() > 10000 {
1048 return false;
1049 }
1050
1051 match frame.address.ip() {
1053 IpAddr::V4(ipv4) => {
1054 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1056 }
1057 IpAddr::V6(ipv6) => {
1058 !ipv6.is_unspecified() && !ipv6.is_multicast()
1060 }
1061 }
1062 }
1063
1064 fn validate_target_peer_request(
1066 &self,
1067 requesting_peer: [u8; 32],
1068 target_peer: [u8; 32],
1069 _frame: &crate::frame::PunchMeNow,
1070 ) -> bool {
1071 if requesting_peer == target_peer {
1073 return false;
1074 }
1075 true
1081 }
1082
1083 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1085 self.coordination_requests.len() < self.max_coordination_per_window as usize
1093 }
1094}
1095
1096impl AdaptiveTimeoutState {
1097 pub(crate) fn new() -> Self {
1099 let base_timeout = Duration::from_millis(1000); Self {
1101 current_timeout: base_timeout,
1102 min_timeout: Duration::from_millis(100),
1103 max_timeout: Duration::from_secs(30),
1104 base_timeout,
1105 backoff_multiplier: 1.0,
1106 max_backoff_multiplier: 8.0,
1107 jitter_factor: 0.1, srtt: None,
1109 rttvar: None,
1110 last_rtt: None,
1111 consecutive_timeouts: 0,
1112 successful_responses: 0,
1113 }
1114 }
1115 fn update_success(&mut self, rtt: Duration) {
1117 self.last_rtt = Some(rtt);
1118 self.successful_responses += 1;
1119 self.consecutive_timeouts = 0;
1120 match self.srtt {
1122 None => {
1123 self.srtt = Some(rtt);
1124 self.rttvar = Some(rtt / 2);
1125 }
1126 Some(srtt) => {
1127 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1128 let abs_diff = rtt.abs_diff(srtt);
1129
1130 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1131 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1132 }
1133 }
1134
1135 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1137
1138 self.calculate_current_timeout();
1140 }
1141
1142 fn update_timeout(&mut self) {
1144 self.consecutive_timeouts += 1;
1145 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1147
1148 self.calculate_current_timeout();
1150 }
1151
1152 fn calculate_current_timeout(&mut self) {
1154 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1155 srtt + rttvar * 4
1157 } else {
1158 self.base_timeout
1159 };
1160 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1162
1163 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1165 let timeout = timeout.mul_f64(jitter);
1166
1167 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1169 }
1170
1171 fn get_timeout(&self) -> Duration {
1173 self.current_timeout
1174 }
1175 fn should_retry(&self, max_retries: u32) -> bool {
1177 self.consecutive_timeouts < max_retries
1178 }
1179 fn get_retry_delay(&self) -> Duration {
1181 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1182 delay.clamp(self.min_timeout, self.max_timeout)
1183 }
1184}
1185#[derive(Debug)]
1187#[allow(dead_code)]
1188pub(super) struct ResourceManagementConfig {
1189 max_active_validations: usize,
1191 max_local_candidates: usize,
1193 max_remote_candidates: usize,
1195 max_candidate_pairs: usize,
1197 max_coordination_history: usize,
1199 cleanup_interval: Duration,
1201 candidate_timeout: Duration,
1203 validation_timeout: Duration,
1205 coordination_timeout: Duration,
1207 memory_pressure_threshold: f64,
1209 aggressive_cleanup_threshold: f64,
1211}
1212#[derive(Debug, Default)]
1214#[allow(dead_code)]
1215pub(super) struct ResourceStats {
1216 active_validations: usize,
1218 local_candidates: usize,
1220 remote_candidates: usize,
1222 candidate_pairs: usize,
1224 peak_memory_usage: usize,
1226 cleanup_operations: u64,
1228 resources_cleaned: u64,
1230 allocation_failures: u64,
1232 last_cleanup: Option<Instant>,
1234 memory_pressure: f64,
1236}
1237#[derive(Debug)]
1239pub(super) struct ResourceCleanupCoordinator {
1240 config: ResourceManagementConfig,
1242 stats: ResourceStats,
1244 last_cleanup: Option<Instant>,
1246 cleanup_counter: u64,
1248 shutdown_requested: bool,
1250}
1251impl ResourceManagementConfig {
1252 fn new() -> Self {
1254 Self {
1255 max_active_validations: 100,
1256 max_local_candidates: 50,
1257 max_remote_candidates: 100,
1258 max_candidate_pairs: 200,
1259 max_coordination_history: 10,
1260 cleanup_interval: Duration::from_secs(30),
1261 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1263 coordination_timeout: Duration::from_secs(60),
1264 memory_pressure_threshold: 0.75,
1265 aggressive_cleanup_threshold: 0.90,
1266 }
1267 }
1268 #[cfg(feature = "low_memory")]
1270 fn low_memory() -> Self {
1271 Self {
1272 max_active_validations: 25,
1273 max_local_candidates: 10,
1274 max_remote_candidates: 25,
1275 max_candidate_pairs: 50,
1276 max_coordination_history: 3,
1277 cleanup_interval: Duration::from_secs(15),
1278 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1280 coordination_timeout: Duration::from_secs(30),
1281 memory_pressure_threshold: 0.60,
1282 aggressive_cleanup_threshold: 0.80,
1283 }
1284 }
1285}
1286#[allow(dead_code)]
1287impl ResourceCleanupCoordinator {
1288 fn new() -> Self {
1290 Self {
1291 config: ResourceManagementConfig::new(),
1292 stats: ResourceStats::default(),
1293 last_cleanup: None,
1294 cleanup_counter: 0,
1295 shutdown_requested: false,
1296 }
1297 }
1298 #[cfg(feature = "low_memory")]
1300 fn low_memory() -> Self {
1301 Self {
1302 config: ResourceManagementConfig::low_memory(),
1303 stats: ResourceStats::default(),
1304 last_cleanup: None,
1305 cleanup_counter: 0,
1306 shutdown_requested: false,
1307 }
1308 }
1309 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1311 state.active_validations.len() > self.config.max_active_validations
1312 || state.local_candidates.len() > self.config.max_local_candidates
1313 || state.remote_candidates.len() > self.config.max_remote_candidates
1314 || state.candidate_pairs.len() > self.config.max_candidate_pairs
1315 }
1316 fn calculate_memory_pressure(
1318 &mut self,
1319 active_validations_len: usize,
1320 local_candidates_len: usize,
1321 remote_candidates_len: usize,
1322 candidate_pairs_len: usize,
1323 ) -> f64 {
1324 let total_limit = self.config.max_active_validations
1325 + self.config.max_local_candidates
1326 + self.config.max_remote_candidates
1327 + self.config.max_candidate_pairs;
1328 let current_usage = active_validations_len
1329 + local_candidates_len
1330 + remote_candidates_len
1331 + candidate_pairs_len;
1332
1333 let pressure = current_usage as f64 / total_limit as f64;
1334 self.stats.memory_pressure = pressure;
1335 pressure
1336 }
1337
1338 fn should_cleanup(&self, now: Instant) -> bool {
1340 if self.shutdown_requested {
1341 return true;
1342 }
1343 if let Some(last_cleanup) = self.last_cleanup {
1345 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1346 return true;
1347 }
1348 } else {
1349 return true; }
1351
1352 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1354 return true;
1355 }
1356
1357 false
1358 }
1359
1360 fn cleanup_expired_resources(
1362 &mut self,
1363 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1364 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1365 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1366 candidate_pairs: &mut Vec<CandidatePair>,
1367 coordination: &mut Option<CoordinationState>,
1368 now: Instant,
1369 ) -> u64 {
1370 let mut cleaned = 0;
1371 cleaned += self.cleanup_expired_validations(active_validations, now);
1373
1374 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1376
1377 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1379
1380 cleaned += self.cleanup_old_coordination(coordination, now);
1382
1383 self.stats.cleanup_operations += 1;
1385 self.stats.resources_cleaned += cleaned;
1386 self.last_cleanup = Some(now);
1387 self.cleanup_counter += 1;
1388
1389 debug!("Cleaned up {} expired resources", cleaned);
1390 cleaned
1391 }
1392
1393 fn cleanup_expired_validations(
1395 &mut self,
1396 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1397 now: Instant,
1398 ) -> u64 {
1399 let mut cleaned = 0;
1400 let validation_timeout = self.config.validation_timeout;
1401 active_validations.retain(|_addr, validation| {
1402 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1403 if is_expired {
1404 cleaned += 1;
1405 trace!("Cleaned up expired validation for {:?}", _addr);
1406 }
1407 !is_expired
1408 });
1409
1410 cleaned
1411 }
1412
1413 fn cleanup_stale_candidates(
1415 &mut self,
1416 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1417 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1418 now: Instant,
1419 ) -> u64 {
1420 let mut cleaned = 0;
1421 let candidate_timeout = self.config.candidate_timeout;
1422 local_candidates.retain(|_seq, candidate| {
1424 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1425 || candidate.state == CandidateState::Failed
1426 || candidate.state == CandidateState::Removed;
1427 if is_stale {
1428 cleaned += 1;
1429 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1430 }
1431 !is_stale
1432 });
1433
1434 remote_candidates.retain(|_seq, candidate| {
1436 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1437 || candidate.state == CandidateState::Failed
1438 || candidate.state == CandidateState::Removed;
1439 if is_stale {
1440 cleaned += 1;
1441 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1442 }
1443 !is_stale
1444 });
1445
1446 cleaned
1447 }
1448
1449 fn cleanup_failed_pairs(
1451 &mut self,
1452 candidate_pairs: &mut Vec<CandidatePair>,
1453 now: Instant,
1454 ) -> u64 {
1455 let mut cleaned = 0;
1456 let pair_timeout = self.config.candidate_timeout;
1457 candidate_pairs.retain(|pair| {
1458 let is_stale = now.duration_since(pair.created_at) > pair_timeout
1459 || pair.state == PairState::Failed;
1460 if is_stale {
1461 cleaned += 1;
1462 trace!(
1463 "Cleaned up failed candidate pair {:?} -> {:?}",
1464 pair.local_addr, pair.remote_addr
1465 );
1466 }
1467 !is_stale
1468 });
1469
1470 cleaned
1471 }
1472
1473 fn cleanup_old_coordination(
1475 &mut self,
1476 coordination: &mut Option<CoordinationState>,
1477 now: Instant,
1478 ) -> u64 {
1479 let mut cleaned = 0;
1480 if let Some(coord) = coordination {
1481 let is_expired =
1482 now.duration_since(coord.round_start) > self.config.coordination_timeout;
1483 let is_failed = coord.state == CoordinationPhase::Failed;
1484
1485 if is_expired || is_failed {
1486 let round = coord.round;
1487 *coordination = None;
1488 cleaned += 1;
1489 trace!("Cleaned up old coordination state for round {}", round);
1490 }
1491 }
1492
1493 cleaned
1494 }
1495
1496 fn aggressive_cleanup(
1498 &mut self,
1499 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1500 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1501 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1502 candidate_pairs: &mut Vec<CandidatePair>,
1503 now: Instant,
1504 ) -> u64 {
1505 let mut cleaned = 0;
1506 let aggressive_timeout = self.config.candidate_timeout / 2;
1508
1509 local_candidates.retain(|_seq, candidate| {
1511 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1512 && candidate.state != CandidateState::Failed;
1513 if !keep {
1514 cleaned += 1;
1515 }
1516 keep
1517 });
1518
1519 remote_candidates.retain(|_seq, candidate| {
1520 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1521 && candidate.state != CandidateState::Failed;
1522 if !keep {
1523 cleaned += 1;
1524 }
1525 keep
1526 });
1527
1528 candidate_pairs.retain(|pair| {
1530 let keep = pair.state != PairState::Waiting
1531 || now.duration_since(pair.created_at) <= aggressive_timeout;
1532 if !keep {
1533 cleaned += 1;
1534 }
1535 keep
1536 });
1537
1538 active_validations.retain(|_addr, validation| {
1540 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1541 if !keep {
1542 cleaned += 1;
1543 }
1544 keep
1545 });
1546
1547 warn!(
1548 "Aggressive cleanup removed {} resources due to memory pressure",
1549 cleaned
1550 );
1551 cleaned
1552 }
1553
1554 fn request_shutdown(&mut self) {
1556 self.shutdown_requested = true;
1557 debug!("Resource cleanup coordinator shutdown requested");
1558 }
1559 fn shutdown_cleanup(
1561 &mut self,
1562 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1563 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1564 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1565 candidate_pairs: &mut Vec<CandidatePair>,
1566 coordination: &mut Option<CoordinationState>,
1567 ) -> u64 {
1568 let mut cleaned = 0;
1569 cleaned += active_validations.len() as u64;
1571 active_validations.clear();
1572
1573 cleaned += local_candidates.len() as u64;
1574 local_candidates.clear();
1575
1576 cleaned += remote_candidates.len() as u64;
1577 remote_candidates.clear();
1578
1579 cleaned += candidate_pairs.len() as u64;
1580 candidate_pairs.clear();
1581
1582 if coordination.is_some() {
1583 *coordination = None;
1584 cleaned += 1;
1585 }
1586
1587 info!("Shutdown cleanup removed {} resources", cleaned);
1588 cleaned
1589 }
1590
1591 fn get_resource_stats(&self) -> &ResourceStats {
1593 &self.stats
1594 }
1595 fn update_stats(
1597 &mut self,
1598 active_validations_len: usize,
1599 local_candidates_len: usize,
1600 remote_candidates_len: usize,
1601 candidate_pairs_len: usize,
1602 ) {
1603 self.stats.active_validations = active_validations_len;
1604 self.stats.local_candidates = local_candidates_len;
1605 self.stats.remote_candidates = remote_candidates_len;
1606 self.stats.candidate_pairs = candidate_pairs_len;
1607 let current_usage = self.stats.active_validations
1609 + self.stats.local_candidates
1610 + self.stats.remote_candidates
1611 + self.stats.candidate_pairs;
1612
1613 if current_usage > self.stats.peak_memory_usage {
1614 self.stats.peak_memory_usage = current_usage;
1615 }
1616 }
1617
1618 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1620 self.last_cleanup = Some(now);
1621 self.cleanup_counter += 1;
1622 self.stats.cleanup_operations += 1;
1624
1625 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1626 }
1627}
1628
1629#[allow(dead_code)]
1630impl NetworkConditionMonitor {
1631 fn new() -> Self {
1633 Self {
1634 rtt_samples: VecDeque::new(),
1635 max_samples: 20,
1636 packet_loss_rate: 0.0,
1637 congestion_window: 10,
1638 quality_score: 0.8, last_quality_update: Instant::now(),
1640 quality_update_interval: Duration::from_secs(10),
1641 timeout_stats: TimeoutStatistics::default(),
1642 }
1643 }
1644 fn record_success(&mut self, rtt: Duration, now: Instant) {
1646 self.rtt_samples.push_back(rtt);
1648 if self.rtt_samples.len() > self.max_samples {
1649 self.rtt_samples.pop_front();
1650 }
1651 self.timeout_stats.total_responses += 1;
1653 self.update_timeout_stats(now);
1654
1655 self.update_quality_score(now);
1657 }
1658
1659 fn record_timeout(&mut self, now: Instant) {
1661 self.timeout_stats.total_timeouts += 1;
1662 self.update_timeout_stats(now);
1663 self.update_quality_score(now);
1665 }
1666
1667 fn update_timeout_stats(&mut self, now: Instant) {
1669 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1670 if total_attempts > 0 {
1671 self.timeout_stats.timeout_rate =
1672 self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1673 }
1674
1675 if !self.rtt_samples.is_empty() {
1677 let total_rtt: Duration = self.rtt_samples.iter().sum();
1678 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1679 }
1680
1681 self.timeout_stats.last_update = Some(now);
1682 }
1683
1684 fn update_quality_score(&mut self, now: Instant) {
1686 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1687 return;
1688 }
1689 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1691 let rtt_factor = self.calculate_rtt_factor();
1692 let consistency_factor = self.calculate_consistency_factor();
1693
1694 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1696
1697 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1699 self.last_quality_update = now;
1700 }
1701
1702 fn calculate_rtt_factor(&self) -> f64 {
1704 if self.rtt_samples.is_empty() {
1705 return 0.5; }
1707 let avg_rtt = self.timeout_stats.avg_response_time;
1708
1709 let rtt_ms = avg_rtt.as_millis() as f64;
1711 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1712 factor.clamp(0.0, 1.0)
1713 }
1714
1715 fn calculate_consistency_factor(&self) -> f64 {
1717 if self.rtt_samples.len() < 3 {
1718 return 0.5; }
1720 let mean_rtt = self.timeout_stats.avg_response_time;
1722 let variance: f64 = self
1723 .rtt_samples
1724 .iter()
1725 .map(|rtt| {
1726 let diff = (*rtt).abs_diff(mean_rtt);
1727 diff.as_millis() as f64
1728 })
1729 .map(|diff| diff * diff)
1730 .sum::<f64>()
1731 / self.rtt_samples.len() as f64;
1732
1733 let std_dev = variance.sqrt();
1734
1735 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1737 consistency.clamp(0.0, 1.0)
1738 }
1739
1740 fn get_quality_score(&self) -> f64 {
1742 self.quality_score
1743 }
1744 fn get_estimated_rtt(&self) -> Option<Duration> {
1746 if self.rtt_samples.is_empty() {
1747 return None;
1748 }
1749 Some(self.timeout_stats.avg_response_time)
1750 }
1751
1752 fn is_suitable_for_coordination(&self) -> bool {
1754 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1756 }
1757 fn get_packet_loss_rate(&self) -> f64 {
1759 self.packet_loss_rate
1760 }
1761
1762 fn get_timeout_multiplier(&self) -> f64 {
1764 let base_multiplier = 1.0;
1765
1766 let quality_multiplier = if self.quality_score < 0.3 {
1768 2.0 } else if self.quality_score > 0.8 {
1770 0.8 } else {
1772 1.0 };
1774
1775 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1777
1778 base_multiplier * quality_multiplier * loss_multiplier
1779 }
1780
1781 fn cleanup(&mut self, now: Instant) {
1783 let _cutoff_time = now - Duration::from_secs(60);
1785
1786 if let Some(last_update) = self.timeout_stats.last_update {
1788 if now.duration_since(last_update) > Duration::from_secs(300) {
1789 self.timeout_stats = TimeoutStatistics::default();
1790 }
1791 }
1792 }
1793}
1794
1795#[allow(dead_code)]
1796impl NatTraversalState {
1797 pub(super) fn new(max_candidates: u32, coordination_timeout: Duration) -> Self {
1802 let bootstrap_coordinator = Some(BootstrapCoordinator::new(BootstrapConfig::default()));
1804
1805 Self {
1806 local_candidates: HashMap::new(),
1808 remote_candidates: HashMap::new(),
1809 candidate_pairs: Vec::new(),
1810 pair_index: HashMap::new(),
1811 active_validations: HashMap::new(),
1812 coordination: None,
1813 next_sequence: VarInt::from_u32(1),
1814 max_candidates,
1815 coordination_timeout,
1816 stats: NatTraversalStats::default(),
1817 security_state: SecurityValidationState::new(),
1818 network_monitor: NetworkConditionMonitor::new(),
1819 resource_manager: ResourceCleanupCoordinator::new(),
1820 bootstrap_coordinator,
1821 }
1822 }
1823
1824 pub(super) fn add_remote_candidate(
1826 &mut self,
1827 sequence: VarInt,
1828 address: SocketAddr,
1829 priority: VarInt,
1830 now: Instant,
1831 ) -> Result<(), NatTraversalError> {
1832 if self.should_reject_new_resources(now) {
1834 debug!(
1835 "Rejecting new candidate due to resource limits: {}",
1836 address
1837 );
1838 return Err(NatTraversalError::ResourceLimitExceeded);
1839 }
1840 if self.security_state.is_candidate_rate_limited(now) {
1842 self.stats.rate_limit_violations += 1;
1843 debug!("Rate limit exceeded for candidate addition: {}", address);
1844 return Err(NatTraversalError::RateLimitExceeded);
1845 }
1846
1847 match self.security_state.validate_address(address, now) {
1849 AddressValidationResult::Invalid => {
1850 self.stats.invalid_address_rejections += 1;
1851 self.stats.security_rejections += 1;
1852 debug!("Invalid address rejected: {}", address);
1853 return Err(NatTraversalError::InvalidAddress);
1854 }
1855 AddressValidationResult::Suspicious => {
1856 self.stats.security_rejections += 1;
1857 debug!("Suspicious address rejected: {}", address);
1858 return Err(NatTraversalError::SecurityValidationFailed);
1859 }
1860 AddressValidationResult::Valid => {
1861 }
1863 }
1864
1865 if self.remote_candidates.len() >= self.max_candidates as usize {
1867 return Err(NatTraversalError::TooManyCandidates);
1868 }
1869
1870 if self
1872 .remote_candidates
1873 .values()
1874 .any(|c| c.address == address && c.state != CandidateState::Removed)
1875 {
1876 return Err(NatTraversalError::DuplicateAddress);
1877 }
1878
1879 let candidate = AddressCandidate {
1880 address,
1881 priority: priority.into_inner() as u32,
1882 source: CandidateSource::Peer,
1883 discovered_at: now,
1884 state: CandidateState::New,
1885 attempt_count: 0,
1886 last_attempt: None,
1887 };
1888
1889 self.remote_candidates.insert(sequence, candidate);
1890 self.stats.remote_candidates_received += 1;
1891
1892 trace!(
1893 "Added remote candidate: {} with priority {}",
1894 address, priority
1895 );
1896 Ok(())
1897 }
1898
1899 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1901 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1902 candidate.state = CandidateState::Removed;
1903 self.active_validations.remove(&candidate.address);
1905 true
1906 } else {
1907 false
1908 }
1909 }
1910
1911 #[allow(clippy::expect_used)]
1913 pub(super) fn add_local_candidate(
1914 &mut self,
1915 address: SocketAddr,
1916 source: CandidateSource,
1917 now: Instant,
1918 ) -> VarInt {
1919 let sequence = self.next_sequence;
1920 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1921 .expect("sequence number overflow");
1922 let candidate_type = classify_candidate_type(source);
1924 let local_preference = self.calculate_local_preference(address);
1925 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1926
1927 let candidate = AddressCandidate {
1928 address,
1929 priority,
1930 source,
1931 discovered_at: now,
1932 state: CandidateState::New,
1933 attempt_count: 0,
1934 last_attempt: None,
1935 };
1936
1937 self.local_candidates.insert(sequence, candidate);
1938 self.stats.local_candidates_sent += 1;
1939
1940 self.generate_candidate_pairs(now);
1942
1943 sequence
1944 }
1945
1946 fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1948 match addr {
1949 SocketAddr::V4(v4) => {
1950 if v4.ip().is_loopback() {
1951 0 } else if v4.ip().is_private() {
1953 65000 } else {
1955 32000 }
1957 }
1958 SocketAddr::V6(v6) => {
1959 if v6.ip().is_loopback() {
1960 0
1961 } else if v6.ip().segments()[0] == 0xfe80 {
1962 30000 } else {
1965 50000 }
1967 }
1968 }
1969 }
1970 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1972 self.candidate_pairs.clear();
1973 self.pair_index.clear();
1974 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1976 self.candidate_pairs.reserve(estimated_capacity);
1977 self.pair_index.reserve(estimated_capacity);
1978
1979 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
1981
1982 for local_candidate in self.local_candidates.values() {
1983 if local_candidate.state == CandidateState::Removed {
1985 continue;
1986 }
1987
1988 let local_type = classify_candidate_type(local_candidate.source);
1990
1991 for (remote_seq, remote_candidate) in &self.remote_candidates {
1992 if remote_candidate.state == CandidateState::Removed {
1994 continue;
1995 }
1996
1997 let cache_key = (local_candidate.address, remote_candidate.address);
1999 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2000 are_candidates_compatible(local_candidate, remote_candidate)
2001 });
2002
2003 if !compatible {
2004 continue;
2005 }
2006
2007 let pair_priority =
2009 calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
2010
2011 let remote_type = classify_candidate_type(remote_candidate.source);
2013 let pair_type = classify_pair_type(local_type, remote_type);
2014
2015 let pair = CandidatePair {
2016 remote_sequence: *remote_seq,
2017 local_addr: local_candidate.address,
2018 remote_addr: remote_candidate.address,
2019 priority: pair_priority,
2020 state: PairState::Waiting,
2021 pair_type,
2022 created_at: now,
2023 last_check: None,
2024 };
2025
2026 let index = self.candidate_pairs.len();
2028 self.pair_index.insert(remote_candidate.address, index);
2029 self.candidate_pairs.push(pair);
2030 }
2031 }
2032
2033 self.candidate_pairs
2035 .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2036
2037 self.pair_index.clear();
2039 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2040 self.pair_index.insert(pair.remote_addr, idx);
2041 }
2042
2043 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2044 }
2045
2046 pub(super) fn get_next_validation_pairs(
2048 &mut self,
2049 max_concurrent: usize,
2050 ) -> Vec<&mut CandidatePair> {
2051 let mut result = Vec::with_capacity(max_concurrent);
2054 for pair in self.candidate_pairs.iter_mut() {
2055 if pair.state == PairState::Waiting {
2056 result.push(pair);
2057 if result.len() >= max_concurrent {
2058 break;
2059 }
2060 }
2061 }
2062
2063 result
2064 }
2065
2066 pub(super) fn find_pair_by_remote_addr(
2068 &mut self,
2069 addr: SocketAddr,
2070 ) -> Option<&mut CandidatePair> {
2071 if let Some(&index) = self.pair_index.get(&addr) {
2073 self.candidate_pairs.get_mut(index)
2074 } else {
2075 None
2076 }
2077 }
2078 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2080 let (succeeded_type, succeeded_priority) = {
2082 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2083 pair.state = PairState::Succeeded;
2084 (pair.pair_type, pair.priority)
2085 } else {
2086 return false;
2087 }
2088 };
2089 for other_pair in &mut self.candidate_pairs {
2091 if other_pair.pair_type == succeeded_type
2092 && other_pair.priority < succeeded_priority
2093 && other_pair.state == PairState::Waiting
2094 {
2095 other_pair.state = PairState::Frozen;
2096 }
2097 }
2098
2099 true
2100 }
2101
2102 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2104 let mut best_ipv4: Option<&CandidatePair> = None;
2105 let mut best_ipv6: Option<&CandidatePair> = None;
2106 for pair in &self.candidate_pairs {
2107 if pair.state != PairState::Succeeded {
2108 continue;
2109 }
2110
2111 match pair.remote_addr {
2112 SocketAddr::V4(_) => {
2113 if best_ipv4.is_none_or(|best| pair.priority > best.priority) {
2114 best_ipv4 = Some(pair);
2115 }
2116 }
2117 SocketAddr::V6(_) => {
2118 if best_ipv6.is_none_or(|best| pair.priority > best.priority) {
2119 best_ipv6 = Some(pair);
2120 }
2121 }
2122 }
2123 }
2124
2125 let mut result = Vec::new();
2126 if let Some(pair) = best_ipv4 {
2127 result.push(pair);
2128 }
2129 if let Some(pair) = best_ipv6 {
2130 result.push(pair);
2131 }
2132 result
2133 }
2134
2135 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2137 let mut candidates: Vec<_> = self
2138 .remote_candidates
2139 .iter()
2140 .filter(|(_, c)| c.state == CandidateState::New)
2141 .map(|(k, v)| (*k, v))
2142 .collect();
2143 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2145 candidates
2146 }
2147
2148 pub(super) fn start_validation(
2150 &mut self,
2151 sequence: VarInt,
2152 challenge: u64,
2153 now: Instant,
2154 ) -> Result<(), NatTraversalError> {
2155 let candidate = self
2156 .remote_candidates
2157 .get_mut(&sequence)
2158 .ok_or(NatTraversalError::UnknownCandidate)?;
2159 if candidate.state != CandidateState::New {
2160 return Err(NatTraversalError::InvalidCandidateState);
2161 }
2162
2163 if Self::is_validation_suspicious(candidate, now) {
2165 self.stats.security_rejections += 1;
2166 debug!(
2167 "Suspicious validation attempt rejected for address {}",
2168 candidate.address
2169 );
2170 return Err(NatTraversalError::SecurityValidationFailed);
2171 }
2172
2173 if self.active_validations.len() >= 10 {
2175 debug!(
2176 "Too many concurrent validations, rejecting new validation for {}",
2177 candidate.address
2178 );
2179 return Err(NatTraversalError::SecurityValidationFailed);
2180 }
2181
2182 candidate.state = CandidateState::Validating;
2184 candidate.attempt_count += 1;
2185 candidate.last_attempt = Some(now);
2186
2187 let validation = PathValidationState {
2189 challenge,
2190 sent_at: now,
2191 retry_count: 0,
2192 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2194 timeout_state: AdaptiveTimeoutState::new(),
2195 last_retry_at: None,
2196 };
2197
2198 self.active_validations
2199 .insert(candidate.address, validation);
2200 trace!(
2201 "Started validation for candidate {} with challenge {}",
2202 candidate.address, challenge
2203 );
2204 Ok(())
2205 }
2206
2207 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2209 if candidate.attempt_count > 10 {
2211 return true;
2212 }
2213 if let Some(last_attempt) = candidate.last_attempt {
2215 let time_since_last = now.duration_since(last_attempt);
2216 if time_since_last < Duration::from_millis(100) {
2217 return true; }
2219 }
2220
2221 if candidate.state == CandidateState::Failed {
2223 let time_since_discovery = now.duration_since(candidate.discovered_at);
2224 if time_since_discovery < Duration::from_secs(60) {
2225 return true; }
2227 }
2228
2229 false
2230 }
2231
2232 pub(super) fn handle_validation_success(
2234 &mut self,
2235 remote_addr: SocketAddr,
2236 challenge: u64,
2237 now: Instant,
2238 ) -> Result<VarInt, NatTraversalError> {
2239 let sequence = self
2241 .remote_candidates
2242 .iter()
2243 .find(|(_, c)| c.address == remote_addr)
2244 .map(|(seq, _)| *seq)
2245 .ok_or(NatTraversalError::UnknownCandidate)?;
2246 let validation = self
2248 .active_validations
2249 .get_mut(&remote_addr)
2250 .ok_or(NatTraversalError::NoActiveValidation)?;
2251
2252 if validation.challenge != challenge {
2253 return Err(NatTraversalError::ChallengeMismatch);
2254 }
2255
2256 let rtt = now.duration_since(validation.sent_at);
2258 validation.timeout_state.update_success(rtt);
2259
2260 self.network_monitor.record_success(rtt, now);
2262
2263 let candidate = self
2265 .remote_candidates
2266 .get_mut(&sequence)
2267 .ok_or(NatTraversalError::UnknownCandidate)?;
2268
2269 candidate.state = CandidateState::Valid;
2270 self.active_validations.remove(&remote_addr);
2271 self.stats.validations_succeeded += 1;
2272
2273 trace!(
2274 "Validation successful for {} with RTT {:?}",
2275 remote_addr, rtt
2276 );
2277 Ok(sequence)
2278 }
2279
2280 pub(super) fn start_coordination_round(
2282 &mut self,
2283 targets: Vec<PunchTarget>,
2284 now: Instant,
2285 ) -> Result<VarInt, NatTraversalError> {
2286 if self.security_state.is_coordination_rate_limited(now) {
2288 self.stats.rate_limit_violations += 1;
2289 debug!(
2290 "Rate limit exceeded for coordination request with {} targets",
2291 targets.len()
2292 );
2293 return Err(NatTraversalError::RateLimitExceeded);
2294 }
2295 if self.is_coordination_suspicious(&targets, now) {
2297 self.stats.suspicious_coordination_attempts += 1;
2298 self.stats.security_rejections += 1;
2299 debug!(
2300 "Suspicious coordination request rejected with {} targets",
2301 targets.len()
2302 );
2303 return Err(NatTraversalError::SuspiciousCoordination);
2304 }
2305
2306 for target in &targets {
2308 match self
2309 .security_state
2310 .validate_address(target.remote_addr, now)
2311 {
2312 AddressValidationResult::Invalid => {
2313 self.stats.invalid_address_rejections += 1;
2314 self.stats.security_rejections += 1;
2315 debug!(
2316 "Invalid target address in coordination: {}",
2317 target.remote_addr
2318 );
2319 return Err(NatTraversalError::InvalidAddress);
2320 }
2321 AddressValidationResult::Suspicious => {
2322 self.stats.security_rejections += 1;
2323 debug!(
2324 "Suspicious target address in coordination: {}",
2325 target.remote_addr
2326 );
2327 return Err(NatTraversalError::SecurityValidationFailed);
2328 }
2329 AddressValidationResult::Valid => {
2330 }
2332 }
2333 }
2334
2335 let round = self.next_sequence;
2336 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2337 .expect("sequence number overflow");
2338
2339 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2342
2343 self.coordination = Some(CoordinationState {
2344 round,
2345 punch_targets: targets,
2346 round_start: now,
2347 punch_start,
2348 round_duration: self.coordination_timeout,
2349 state: CoordinationPhase::Requesting,
2350 punch_request_sent: false,
2351 peer_punch_received: false,
2352 retry_count: 0,
2353 max_retries: 3,
2354 timeout_state: AdaptiveTimeoutState::new(),
2355 last_retry_at: None,
2356 });
2357
2358 self.stats.coordination_rounds += 1;
2359 trace!(
2360 "Started coordination round {} with {} targets",
2361 round,
2362 self.coordination
2363 .as_ref()
2364 .map(|c| c.punch_targets.len())
2365 .unwrap_or(0)
2366 );
2367 Ok(round)
2368 }
2369
2370 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2372 if targets.len() > 20 {
2374 return true;
2375 }
2376 let mut seen_addresses = std::collections::HashSet::new();
2378 for target in targets {
2379 if !seen_addresses.insert(target.remote_addr) {
2380 return true; }
2382 }
2383
2384 if targets.len() > 5 {
2386 let mut ipv4_addresses: Vec<_> = targets
2388 .iter()
2389 .filter_map(|t| match t.remote_addr.ip() {
2390 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2391 _ => None,
2392 })
2393 .collect();
2394
2395 if ipv4_addresses.len() >= 3 {
2396 ipv4_addresses.sort();
2397 let mut sequential_count = 1;
2398 for i in 1..ipv4_addresses.len() {
2399 if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2400 sequential_count += 1;
2401 if sequential_count >= 3 {
2402 return true; }
2404 } else {
2405 sequential_count = 1;
2406 }
2407 }
2408 }
2409 }
2410
2411 false
2412 }
2413
2414 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2416 self.coordination.as_ref().map(|c| c.state)
2417 }
2418 pub(super) fn should_send_punch_request(&self) -> bool {
2420 if let Some(coord) = &self.coordination {
2421 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2422 } else {
2423 false
2424 }
2425 }
2426 pub(super) fn mark_punch_request_sent(&mut self) {
2428 if let Some(coord) = &mut self.coordination {
2429 coord.punch_request_sent = true;
2430 coord.state = CoordinationPhase::Coordinating;
2431 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2432 }
2433 }
2434 pub(super) fn handle_peer_punch_request(
2436 &mut self,
2437 peer_round: VarInt,
2438 now: Instant,
2439 ) -> Result<bool, NatTraversalError> {
2440 if self.is_peer_coordination_suspicious(peer_round, now) {
2442 self.stats.suspicious_coordination_attempts += 1;
2443 self.stats.security_rejections += 1;
2444 debug!(
2445 "Suspicious peer coordination request rejected for round {}",
2446 peer_round
2447 );
2448 return Err(NatTraversalError::SuspiciousCoordination);
2449 }
2450 if let Some(coord) = &mut self.coordination {
2451 if coord.round == peer_round {
2452 match coord.state {
2453 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2454 coord.peer_punch_received = true;
2455 coord.state = CoordinationPhase::Preparing;
2456
2457 let network_rtt = self
2459 .network_monitor
2460 .get_estimated_rtt()
2461 .unwrap_or(Duration::from_millis(100));
2462 let quality_score = self.network_monitor.get_quality_score();
2463
2464 let base_grace = Duration::from_millis(150);
2466 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2467 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2468
2469 let adaptive_grace = Duration::from_millis(
2470 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2471 );
2472
2473 coord.punch_start = now + adaptive_grace;
2474
2475 trace!(
2476 "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2477 adaptive_grace, network_rtt, quality_score
2478 );
2479 Ok(true)
2480 }
2481 CoordinationPhase::Preparing => {
2482 trace!("Peer coordination confirmed during preparation");
2484 Ok(true)
2485 }
2486 _ => {
2487 debug!(
2488 "Received coordination in unexpected phase: {:?}",
2489 coord.state
2490 );
2491 Ok(false)
2492 }
2493 }
2494 } else {
2495 debug!(
2496 "Received coordination for wrong round: {} vs {}",
2497 peer_round, coord.round
2498 );
2499 Ok(false)
2500 }
2501 } else {
2502 debug!("Received peer coordination but no active round");
2503 Ok(false)
2504 }
2505 }
2506
2507 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2509 if peer_round.into_inner() == 0 {
2511 return true; }
2513 if let Some(coord) = &self.coordination {
2515 let our_round = coord.round.into_inner();
2516 let peer_round_num = peer_round.into_inner();
2517
2518 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2520 return true;
2521 }
2522 }
2523
2524 false
2525 }
2526
2527 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2529 if let Some(coord) = &self.coordination {
2530 match coord.state {
2531 CoordinationPhase::Preparing => now >= coord.punch_start,
2532 CoordinationPhase::Coordinating => {
2533 coord.peer_punch_received && now >= coord.punch_start
2535 }
2536 _ => false,
2537 }
2538 } else {
2539 false
2540 }
2541 }
2542 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2544 if let Some(coord) = &mut self.coordination {
2545 coord.state = CoordinationPhase::Punching;
2546 let network_rtt = self
2548 .network_monitor
2549 .get_estimated_rtt()
2550 .unwrap_or(Duration::from_millis(100));
2551
2552 let jitter_ms: u64 = rand::random::<u64>() % 11;
2554 let jitter = Duration::from_millis(jitter_ms);
2555 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2556
2557 coord.punch_start = transmission_time.max(now);
2559
2560 trace!(
2561 "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2562 coord.punch_start, network_rtt, jitter
2563 );
2564 }
2565 }
2566
2567 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2569 self.coordination
2570 .as_ref()
2571 .map(|c| c.punch_targets.as_slice())
2572 }
2573 pub(super) fn mark_coordination_validating(&mut self) {
2575 if let Some(coord) = &mut self.coordination {
2576 if coord.state == CoordinationPhase::Punching {
2577 coord.state = CoordinationPhase::Validating;
2578 trace!("Coordination moved to validation phase");
2579 }
2580 }
2581 }
2582 pub(super) fn handle_coordination_success(
2584 &mut self,
2585 remote_addr: SocketAddr,
2586 now: Instant,
2587 ) -> bool {
2588 if let Some(coord) = &mut self.coordination {
2589 let was_target = coord
2591 .punch_targets
2592 .iter()
2593 .any(|target| target.remote_addr == remote_addr);
2594 if was_target && coord.state == CoordinationPhase::Validating {
2595 let rtt = now.duration_since(coord.round_start);
2597 coord.timeout_state.update_success(rtt);
2598 self.network_monitor.record_success(rtt, now);
2599
2600 coord.state = CoordinationPhase::Succeeded;
2601 self.stats.direct_connections += 1;
2602 trace!(
2603 "Coordination succeeded via {} with RTT {:?}",
2604 remote_addr, rtt
2605 );
2606 true
2607 } else {
2608 false
2609 }
2610 } else {
2611 false
2612 }
2613 }
2614
2615 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2617 if let Some(coord) = &mut self.coordination {
2618 coord.retry_count += 1;
2619 coord.timeout_state.update_timeout();
2620 self.network_monitor.record_timeout(now);
2621 if coord.timeout_state.should_retry(coord.max_retries)
2623 && self.network_monitor.is_suitable_for_coordination()
2624 {
2625 coord.state = CoordinationPhase::Requesting;
2627 coord.punch_request_sent = false;
2628 coord.peer_punch_received = false;
2629 coord.round_start = now;
2630 coord.last_retry_at = Some(now);
2631
2632 let retry_delay = coord.timeout_state.get_retry_delay();
2634
2635 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2637 let adjusted_delay = Duration::from_millis(
2638 (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2639 );
2640
2641 coord.punch_start = now + adjusted_delay;
2642
2643 trace!(
2644 "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2645 coord.round,
2646 coord.retry_count + 1,
2647 adjusted_delay,
2648 self.network_monitor.get_quality_score()
2649 );
2650 true
2651 } else {
2652 coord.state = CoordinationPhase::Failed;
2653 self.stats.coordination_failures += 1;
2654
2655 if !self.network_monitor.is_suitable_for_coordination() {
2656 trace!(
2657 "Coordination failed due to poor network conditions (quality: {:.2})",
2658 self.network_monitor.get_quality_score()
2659 );
2660 } else {
2661 trace!("Coordination failed after {} attempts", coord.retry_count);
2662 }
2663 false
2664 }
2665 } else {
2666 false
2667 }
2668 }
2669
2670 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2672 if let Some(coord) = &mut self.coordination {
2673 let timeout = coord.timeout_state.get_timeout();
2674 let elapsed = now.duration_since(coord.round_start);
2675 if elapsed > timeout {
2676 trace!(
2677 "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2678 coord.round, elapsed, timeout
2679 );
2680 self.handle_coordination_failure(now);
2681 true
2682 } else {
2683 false
2684 }
2685 } else {
2686 false
2687 }
2688 }
2689
2690 pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2692 let mut expired_validations = Vec::new();
2693 let mut retry_validations = Vec::new();
2694
2695 for (addr, validation) in &mut self.active_validations {
2696 let timeout = validation.timeout_state.get_timeout();
2697 let elapsed = now.duration_since(validation.sent_at);
2698
2699 if elapsed >= timeout {
2700 if validation
2701 .timeout_state
2702 .should_retry(validation.max_retries)
2703 {
2704 retry_validations.push(*addr);
2706 } else {
2707 expired_validations.push(*addr);
2709 }
2710 }
2711 }
2712
2713 for addr in retry_validations {
2715 if let Some(validation) = self.active_validations.get_mut(&addr) {
2716 validation.retry_count += 1;
2717 validation.sent_at = now;
2718 validation.last_retry_at = Some(now);
2719 validation.timeout_state.update_timeout();
2720
2721 trace!(
2722 "Retrying validation for {} (attempt {})",
2723 addr,
2724 validation.retry_count + 1
2725 );
2726 }
2727 }
2728
2729 for addr in &expired_validations {
2731 self.active_validations.remove(addr);
2732 self.network_monitor.record_timeout(now);
2733 trace!("Validation expired for {}", addr);
2734 }
2735
2736 expired_validations
2737 }
2738
2739 pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2741 let mut retry_addresses = Vec::new();
2742
2743 for (addr, validation) in &mut self.active_validations {
2745 let elapsed = now.duration_since(validation.sent_at);
2746 let timeout = validation.timeout_state.get_timeout();
2747
2748 if elapsed > timeout
2749 && validation
2750 .timeout_state
2751 .should_retry(validation.max_retries)
2752 {
2753 validation.retry_count += 1;
2755 validation.last_retry_at = Some(now);
2756 validation.sent_at = now; validation.timeout_state.update_timeout();
2758
2759 retry_addresses.push(*addr);
2760 trace!(
2761 "Scheduled retry {} for validation to {}",
2762 validation.retry_count, addr
2763 );
2764 }
2765 }
2766
2767 retry_addresses
2768 }
2769
2770 pub(super) fn update_network_conditions(&mut self, now: Instant) {
2772 self.network_monitor.cleanup(now);
2773
2774 let multiplier = self.network_monitor.get_timeout_multiplier();
2776
2777 for validation in self.active_validations.values_mut() {
2779 if multiplier > 1.5 {
2780 validation.timeout_state.backoff_multiplier =
2782 (validation.timeout_state.backoff_multiplier * 1.2)
2783 .min(validation.timeout_state.max_backoff_multiplier);
2784 } else if multiplier < 0.8 {
2785 validation.timeout_state.backoff_multiplier =
2787 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2788 }
2789 }
2790 }
2791
2792 pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2794 if let Some(coord) = &self.coordination {
2795 if coord.retry_count > 0 {
2796 if let Some(last_retry) = coord.last_retry_at {
2797 let retry_delay = coord.timeout_state.get_retry_delay();
2798 return now.duration_since(last_retry) >= retry_delay;
2799 }
2800 }
2801 }
2802 false
2803 }
2804
2805 pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2807 self.resource_manager.update_stats(
2809 self.active_validations.len(),
2810 self.local_candidates.len(),
2811 self.remote_candidates.len(),
2812 self.candidate_pairs.len(),
2813 );
2814
2815 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2817 self.active_validations.len(),
2818 self.local_candidates.len(),
2819 self.remote_candidates.len(),
2820 self.candidate_pairs.len(),
2821 );
2822
2823 let mut cleaned = 0;
2825
2826 if self.resource_manager.should_cleanup(now) {
2827 cleaned += self.resource_manager.cleanup_expired_resources(
2828 &mut self.active_validations,
2829 &mut self.local_candidates,
2830 &mut self.remote_candidates,
2831 &mut self.candidate_pairs,
2832 &mut self.coordination,
2833 now,
2834 );
2835
2836 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2838 cleaned += self.resource_manager.aggressive_cleanup(
2839 &mut self.active_validations,
2840 &mut self.local_candidates,
2841 &mut self.remote_candidates,
2842 &mut self.candidate_pairs,
2843 now,
2844 );
2845 }
2846 }
2847
2848 cleaned
2849 }
2850
2851 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2853 self.resource_manager.update_stats(
2855 self.active_validations.len(),
2856 self.local_candidates.len(),
2857 self.remote_candidates.len(),
2858 self.candidate_pairs.len(),
2859 );
2860 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2861 self.active_validations.len(),
2862 self.local_candidates.len(),
2863 self.remote_candidates.len(),
2864 self.candidate_pairs.len(),
2865 );
2866 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2868 self.resource_manager.stats.allocation_failures += 1;
2869 return true;
2870 }
2871
2872 if self.resource_manager.check_resource_limits(self) {
2874 self.resource_manager.stats.allocation_failures += 1;
2875 return true;
2876 }
2877
2878 false
2879 }
2880
2881 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2883 let mut next_timeout = None;
2884 if let Some(coord) = &self.coordination {
2886 match coord.state {
2887 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2888 let timeout_at = coord.round_start + self.coordination_timeout;
2889 next_timeout =
2890 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2891 }
2892 CoordinationPhase::Preparing => {
2893 next_timeout = Some(
2895 next_timeout
2896 .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
2897 );
2898 }
2899 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2900 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2902 next_timeout =
2903 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2904 }
2905 _ => {}
2906 }
2907 }
2908
2909 for validation in self.active_validations.values() {
2911 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2912 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2913 }
2914
2915 if self.resource_manager.should_cleanup(now) {
2917 let cleanup_at = now + Duration::from_secs(1);
2919 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2920 }
2921
2922 next_timeout
2923 }
2924
2925 pub(super) fn handle_timeout(
2927 &mut self,
2928 now: Instant,
2929 ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2930 let mut actions = Vec::new();
2931 if let Some(coord) = &mut self.coordination {
2933 match coord.state {
2934 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2935 let timeout_at = coord.round_start + self.coordination_timeout;
2936 if now >= timeout_at {
2937 coord.retry_count += 1;
2938 if coord.retry_count >= coord.max_retries {
2939 debug!("Coordination failed after {} retries", coord.retry_count);
2940 coord.state = CoordinationPhase::Failed;
2941 actions.push(TimeoutAction::Failed);
2942 } else {
2943 debug!(
2944 "Coordination timeout, retrying ({}/{})",
2945 coord.retry_count, coord.max_retries
2946 );
2947 coord.state = CoordinationPhase::Requesting;
2948 coord.round_start = now;
2949 actions.push(TimeoutAction::RetryCoordination);
2950 }
2951 }
2952 }
2953 CoordinationPhase::Preparing => {
2954 if now >= coord.punch_start {
2956 debug!("Starting coordinated hole punching");
2957 coord.state = CoordinationPhase::Punching;
2958 actions.push(TimeoutAction::StartValidation);
2959 }
2960 }
2961 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2962 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2963 if now >= timeout_at {
2964 coord.retry_count += 1;
2965 if coord.retry_count >= coord.max_retries {
2966 debug!("Validation failed after {} retries", coord.retry_count);
2967 coord.state = CoordinationPhase::Failed;
2968 actions.push(TimeoutAction::Failed);
2969 } else {
2970 debug!(
2971 "Validation timeout, retrying ({}/{})",
2972 coord.retry_count, coord.max_retries
2973 );
2974 coord.state = CoordinationPhase::Punching;
2975 actions.push(TimeoutAction::StartValidation);
2976 }
2977 }
2978 }
2979 CoordinationPhase::Succeeded => {
2980 actions.push(TimeoutAction::Complete);
2981 }
2982 CoordinationPhase::Failed => {
2983 actions.push(TimeoutAction::Failed);
2984 }
2985 _ => {}
2986 }
2987 }
2988
2989 let mut expired_validations = Vec::new();
2991 for (addr, validation) in &mut self.active_validations {
2992 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2993 if now >= timeout_at {
2994 validation.retry_count += 1;
2995 if validation.retry_count >= validation.max_retries {
2996 debug!("Path validation failed for {}: max retries exceeded", addr);
2997 expired_validations.push(*addr);
2998 } else {
2999 debug!(
3000 "Path validation timeout for {}, retrying ({}/{})",
3001 addr, validation.retry_count, validation.max_retries
3002 );
3003 validation.sent_at = now;
3004 validation.last_retry_at = Some(now);
3005 actions.push(TimeoutAction::StartValidation);
3006 }
3007 }
3008 }
3009
3010 for addr in expired_validations {
3012 self.active_validations.remove(&addr);
3013 }
3014
3015 if self.resource_manager.should_cleanup(now) {
3017 self.resource_manager.perform_cleanup(now);
3018 }
3019
3020 self.network_monitor.update_quality_score(now);
3022
3023 if self.coordination.is_none()
3025 && !self.local_candidates.is_empty()
3026 && !self.remote_candidates.is_empty()
3027 {
3028 actions.push(TimeoutAction::RetryDiscovery);
3029 }
3030
3031 Ok(actions)
3032 }
3033
3034 pub(super) fn handle_address_observation(
3040 &mut self,
3041 peer_id: [u8; 32],
3042 observed_address: SocketAddr,
3043 connection_id: crate::shared::ConnectionId,
3044 now: Instant,
3045 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3046 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3047 let connection_context = ConnectionContext {
3048 connection_id,
3049 original_destination: observed_address, };
3052
3053 bootstrap_coordinator.observe_peer_address(
3055 peer_id,
3056 observed_address,
3057 connection_context,
3058 now,
3059 )?;
3060
3061 let sequence = self.next_sequence;
3063 self.next_sequence =
3064 VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3065
3066 let priority = VarInt::from_u32(100); let add_address_frame =
3068 bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3069
3070 Ok(add_address_frame)
3071 } else {
3072 Ok(None)
3074 }
3075 }
3076
3077 pub(super) fn handle_punch_me_now_frame(
3082 &mut self,
3083 from_peer: [u8; 32],
3084 source_addr: SocketAddr,
3085 frame: &crate::frame::PunchMeNow,
3086 now: Instant,
3087 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3088 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3089 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3090 } else {
3091 Ok(None)
3093 }
3094 }
3095 pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3099 self.bootstrap_coordinator
3100 .as_ref()
3101 .and_then(|coord| coord.peer_index.get(&peer_id).map(|p| p.observed_addr))
3102 }
3103
3104 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3106 debug!("Starting candidate discovery for NAT traversal");
3107 if self.local_candidates.is_empty() {
3109 debug!("Local candidates will be populated by discovery manager");
3112 }
3113
3114 Ok(())
3115 }
3116
3117 pub(super) fn queue_add_address_frame(
3119 &mut self,
3120 sequence: VarInt,
3121 address: SocketAddr,
3122 priority: u32,
3123 ) -> Result<(), NatTraversalError> {
3124 debug!(
3125 "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3126 sequence, address, priority
3127 );
3128
3129 let candidate = AddressCandidate {
3131 address,
3132 priority,
3133 source: CandidateSource::Local,
3134 discovered_at: Instant::now(),
3135 state: CandidateState::New,
3136 attempt_count: 0,
3137 last_attempt: None,
3138 };
3139
3140 if !self.local_candidates.values().any(|c| c.address == address) {
3142 self.local_candidates.insert(sequence, candidate);
3143 }
3144
3145 Ok(())
3146 }
3147}
3148
3149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3151#[allow(dead_code)]
3152pub(crate) enum NatTraversalError {
3153 TooManyCandidates,
3155 DuplicateAddress,
3157 UnknownCandidate,
3159 InvalidCandidateState,
3161 NoActiveValidation,
3163 ChallengeMismatch,
3165 NoActiveCoordination,
3167 SecurityValidationFailed,
3169 RateLimitExceeded,
3171 InvalidAddress,
3173 SuspiciousCoordination,
3175 ResourceLimitExceeded,
3177}
3178impl std::fmt::Display for NatTraversalError {
3179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3180 match self {
3181 Self::TooManyCandidates => write!(f, "too many candidates"),
3182 Self::DuplicateAddress => write!(f, "duplicate address"),
3183 Self::UnknownCandidate => write!(f, "unknown candidate"),
3184 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3185 Self::NoActiveValidation => write!(f, "no active validation"),
3186 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3187 Self::NoActiveCoordination => write!(f, "no active coordination"),
3188 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3189 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3190 Self::InvalidAddress => write!(f, "invalid address"),
3191 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3192 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3193 }
3194 }
3195}
3196
3197impl std::error::Error for NatTraversalError {}
3198
3199#[derive(Debug, Clone)]
3201#[allow(dead_code)]
3202pub(crate) struct SecurityStats {
3203 pub total_security_rejections: u32,
3205 pub rate_limit_violations: u32,
3207 pub invalid_address_rejections: u32,
3209 pub suspicious_coordination_attempts: u32,
3211 pub active_validations: usize,
3213 pub cached_address_validations: usize,
3215 pub current_candidate_rate: usize,
3217 pub current_coordination_rate: usize,
3219}
3220#[derive(Debug)]
3225pub(crate) struct BootstrapCoordinator {
3226 address_observations: HashMap<SocketAddr, AddressObservation>,
3228 peer_index: HashMap<PeerId, ObservedPeer>,
3230 coordination_table: HashMap<VarInt, CoordinationEntry>,
3232 security_validator: SecurityValidationState,
3234 stats: BootstrapStats,
3236}
3237type PeerId = [u8; 32];
3240#[derive(Debug, Clone)]
3242struct ObservedPeer {
3243 observed_addr: SocketAddr,
3244}
3245
3246#[derive(Debug, Clone)]
3248struct CoordinationEntry {
3249 peer_b: Option<PeerId>,
3250 address_hint: SocketAddr,
3251}
3252#[derive(Debug, Clone)]
3254#[allow(dead_code)]
3255pub(crate) struct PeerObservationRecord {
3256 peer_id: PeerId,
3258 observed_address: SocketAddr,
3260 observed_at: Instant,
3262 connection_context: ConnectionContext,
3264 can_coordinate: bool,
3266 coordination_count: u32,
3268 success_rate: f64,
3270}
3271
3272#[derive(Debug, Clone)]
3276#[allow(dead_code)]
3277pub(crate) struct ConnectionContext {
3278 connection_id: ConnectionId,
3280 original_destination: SocketAddr,
3282 }
3284
3285#[derive(Debug, Clone)]
3289#[allow(dead_code)]
3290struct AddressObservation {
3291 address: SocketAddr,
3293 first_observed: Instant,
3295 observation_count: u32,
3297 validation_state: AddressValidationResult,
3299 associated_peers: Vec<PeerId>,
3301}
3302
3303#[derive(Debug, Clone, Default)]
3307pub(crate) struct BootstrapConfig {
3308 _unused: (),
3309}
3310#[derive(Debug, Clone, Default)]
3312pub(crate) struct BootstrapStats {
3313 total_observations: u64,
3315 total_coordinations: u64,
3317 successful_coordinations: u64,
3319 security_rejections: u64,
3321}
3322impl BootstrapCoordinator {
3324 pub(crate) fn new(_config: BootstrapConfig) -> Self {
3326 Self {
3327 address_observations: HashMap::new(),
3328 peer_index: HashMap::new(),
3329 coordination_table: HashMap::new(),
3330 security_validator: SecurityValidationState::new(),
3331 stats: BootstrapStats::default(),
3332 }
3333 }
3334 pub(crate) fn observe_peer_address(
3339 &mut self,
3340 peer_id: PeerId,
3341 observed_address: SocketAddr,
3342 _connection_context: ConnectionContext,
3343 now: Instant,
3344 ) -> Result<(), NatTraversalError> {
3345 match self
3347 .security_validator
3348 .validate_address(observed_address, now)
3349 {
3350 AddressValidationResult::Valid => {}
3351 AddressValidationResult::Invalid => {
3352 self.stats.security_rejections += 1;
3353 return Err(NatTraversalError::InvalidAddress);
3354 }
3355 AddressValidationResult::Suspicious => {
3356 self.stats.security_rejections += 1;
3357 return Err(NatTraversalError::SecurityValidationFailed);
3358 }
3359 }
3360
3361 if self.security_validator.is_candidate_rate_limited(now) {
3363 self.stats.security_rejections += 1;
3364 return Err(NatTraversalError::RateLimitExceeded);
3365 }
3366
3367 let observation = self
3369 .address_observations
3370 .entry(observed_address)
3371 .or_insert_with(|| AddressObservation {
3372 address: observed_address,
3373 first_observed: now,
3374 observation_count: 0,
3375 validation_state: AddressValidationResult::Valid,
3376 associated_peers: Vec::new(),
3377 });
3378
3379 observation.observation_count += 1;
3380 if !observation.associated_peers.contains(&peer_id) {
3381 observation.associated_peers.push(peer_id);
3382 }
3383
3384 self.peer_index.insert(
3386 peer_id,
3387 ObservedPeer {
3388 observed_addr: observed_address,
3389 },
3390 );
3391
3392 self.stats.total_observations += 1;
3394 debug!(
3397 "Observed peer {:?} at address {} (total observations: {})",
3398 peer_id, observed_address, self.stats.total_observations
3399 );
3400
3401 Ok(())
3402 }
3403
3404 pub(crate) fn generate_add_address_frame(
3409 &self,
3410 peer_id: PeerId,
3411 sequence: VarInt,
3412 priority: VarInt,
3413 ) -> Option<crate::frame::AddAddress> {
3414 let addr = self.peer_index.get(&peer_id)?.observed_addr;
3415 Some(crate::frame::AddAddress {
3416 sequence,
3417 address: addr,
3418 priority,
3419 })
3420 }
3421
3422 pub(crate) fn process_punch_me_now_frame(
3427 &mut self,
3428 from_peer: PeerId,
3429 source_addr: SocketAddr,
3430 frame: &crate::frame::PunchMeNow,
3431 now: Instant,
3432 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3433 if self
3435 .security_validator
3436 .is_adaptive_rate_limited(from_peer, now)
3437 {
3438 self.stats.security_rejections += 1;
3439 debug!(
3440 "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3441 hex::encode(&from_peer[..8])
3442 );
3443 return Err(NatTraversalError::RateLimitExceeded);
3444 }
3445 self.security_validator
3447 .enhanced_address_validation(frame.address, source_addr, now)
3448 .inspect_err(|&e| {
3449 self.stats.security_rejections += 1;
3450 debug!(
3451 "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3452 hex::encode(&from_peer[..8]),
3453 e
3454 );
3455 })?;
3456
3457 self.security_validator
3459 .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3460 .inspect_err(|&e| {
3461 self.stats.security_rejections += 1;
3462 debug!(
3463 "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3464 hex::encode(&from_peer[..8]),
3465 e
3466 );
3467 })?;
3468
3469 let _entry = self
3471 .coordination_table
3472 .entry(frame.round)
3473 .or_insert(CoordinationEntry {
3474 peer_b: frame.target_peer_id,
3475 address_hint: frame.address,
3476 });
3477 if let Some(peer_b) = frame.target_peer_id {
3479 if _entry.peer_b.is_none() {
3480 _entry.peer_b = Some(peer_b);
3481 }
3482 _entry.address_hint = frame.address;
3483 }
3484
3485 if let Some(_target_peer_id) = frame.target_peer_id {
3487 let coordination_frame = crate::frame::PunchMeNow {
3488 round: frame.round,
3489 paired_with_sequence_number: frame.paired_with_sequence_number,
3490 address: frame.address,
3491 target_peer_id: Some(from_peer),
3492 };
3493 self.stats.total_coordinations += 1;
3494 Ok(Some(coordination_frame))
3495 } else {
3496 self.stats.successful_coordinations += 1;
3498 Ok(None)
3499 }
3500 }
3501
3502 #[allow(dead_code)]
3508 pub(crate) fn cleanup_expired_sessions(&mut self, _now: Instant) {}
3509
3510 #[allow(dead_code)]
3515 pub(crate) fn poll_session_state_machine(&mut self, _now: Instant) -> Vec<()> {
3516 Vec::new()
3518 }
3519
3520 #[allow(dead_code)]
3524 fn cleanup_completed_sessions(&mut self, _now: Instant) {}
3525
3526 #[allow(dead_code)]
3531 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
3532 let _ = peer_id;
3535 None
3536 }
3537 #[allow(dead_code)]
3558 pub(crate) fn get_peer_record(&self, _peer_id: PeerId) -> Option<&PeerObservationRecord> {
3559 None
3561 }
3562}
3563
3564#[cfg(test)]
3578mod tests {
3579 use super::*;
3580
3581 fn create_test_state() -> NatTraversalState {
3583 NatTraversalState::new(
3584 10, Duration::from_secs(30), )
3587 }
3588
3589 #[test]
3590 fn test_add_quic_discovered_address() {
3591 let mut state = create_test_state();
3593 let now = Instant::now();
3594
3595 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
3597 let seq = state.add_local_candidate(
3598 discovered_addr,
3599 CandidateSource::Observed { by_node: None },
3600 now,
3601 );
3602
3603 assert_eq!(state.local_candidates.len(), 1);
3605 let candidate = state.local_candidates.get(&seq).unwrap();
3606 assert_eq!(candidate.address, discovered_addr);
3607 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3608 assert_eq!(candidate.state, CandidateState::New);
3609
3610 assert!(candidate.priority > 0);
3612 }
3613
3614 #[test]
3615 fn test_add_multiple_quic_discovered_addresses() {
3616 let mut state = create_test_state();
3618 let now = Instant::now();
3619
3620 let addrs = vec![
3621 SocketAddr::from(([1, 2, 3, 4], 5678)),
3622 SocketAddr::from(([5, 6, 7, 8], 9012)),
3623 SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
3624 ];
3625
3626 let mut sequences = Vec::new();
3627 for addr in &addrs {
3628 let seq =
3629 state.add_local_candidate(*addr, CandidateSource::Observed { by_node: None }, now);
3630 sequences.push(seq);
3631 }
3632
3633 assert_eq!(state.local_candidates.len(), 3);
3635
3636 for (seq, addr) in sequences.iter().zip(&addrs) {
3638 let candidate = state.local_candidates.get(seq).unwrap();
3639 assert_eq!(candidate.address, *addr);
3640 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3641 }
3642 }
3643
3644 #[test]
3645 fn test_quic_discovered_addresses_in_local_candidates() {
3646 let mut state = create_test_state();
3648 let now = Instant::now();
3649
3650 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3652 let seq = state.add_local_candidate(addr, CandidateSource::Observed { by_node: None }, now);
3653
3654 assert!(state.local_candidates.contains_key(&seq));
3656 let candidate = state.local_candidates.get(&seq).unwrap();
3657 assert_eq!(candidate.address, addr);
3658
3659 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3661 }
3662
3663 #[test]
3664 fn test_quic_discovered_addresses_included_in_hole_punching() {
3665 let mut state = create_test_state();
3667 let now = Instant::now();
3668
3669 let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3671 state.add_local_candidate(local_addr, CandidateSource::Observed { by_node: None }, now);
3672
3673 let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
3675 let priority = VarInt::from_u32(100);
3676 state
3677 .add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
3678 .expect("add remote candidate should succeed");
3679
3680 state.generate_candidate_pairs(now);
3682
3683 assert_eq!(state.candidate_pairs.len(), 1);
3685 let pair = &state.candidate_pairs[0];
3686 assert_eq!(pair.local_addr, local_addr);
3687 assert_eq!(pair.remote_addr, remote_addr);
3688 }
3689
3690 #[test]
3691 fn test_prioritize_quic_discovered_over_predicted() {
3692 let mut state = create_test_state();
3694 let now = Instant::now();
3695
3696 let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
3698 let predicted_seq =
3699 state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
3700
3701 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
3703 let discovered_seq = state.add_local_candidate(
3704 discovered_addr,
3705 CandidateSource::Observed { by_node: None },
3706 now,
3707 );
3708
3709 let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
3711 let discovered_priority = state
3712 .local_candidates
3713 .get(&discovered_seq)
3714 .unwrap()
3715 .priority;
3716
3717 assert!(discovered_priority >= predicted_priority);
3720 }
3721
3722 #[test]
3723 fn test_integration_with_nat_traversal_flow() {
3724 let mut state = create_test_state();
3726 let now = Instant::now();
3727
3728 let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
3730 state.add_local_candidate(local_addr, CandidateSource::Local, now);
3731
3732 let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
3733 state.add_local_candidate(
3734 discovered_addr,
3735 CandidateSource::Observed { by_node: None },
3736 now,
3737 );
3738
3739 let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
3741 let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
3742 let priority = VarInt::from_u32(100);
3743 state
3744 .add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
3745 .expect("add remote candidate should succeed");
3746 state
3747 .add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
3748 .expect("add remote candidate should succeed");
3749
3750 state.generate_candidate_pairs(now);
3752
3753 assert_eq!(state.candidate_pairs.len(), 4);
3755
3756 let discovered_pairs: Vec<_> = state
3758 .candidate_pairs
3759 .iter()
3760 .filter(|p| p.local_addr == discovered_addr)
3761 .collect();
3762 assert_eq!(discovered_pairs.len(), 2);
3763 }
3764}