1use std::{
9 collections::{HashMap, VecDeque},
10 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
11 time::Duration,
12};
13
14use crate::shared::ConnectionId;
15use tracing::{debug, info, trace, warn};
16
17use crate::{Instant, VarInt};
18
19#[derive(Debug)]
27pub(super) struct NatTraversalState {
28 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
31 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
33 pub(super) candidate_pairs: Vec<CandidatePair>,
35 pub(super) pair_index: HashMap<SocketAddr, usize>,
37 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
39 pub(super) coordination: Option<CoordinationState>,
41 pub(super) next_sequence: VarInt,
43 pub(super) max_candidates: u32,
45 pub(super) coordination_timeout: Duration,
47 pub(super) stats: NatTraversalStats,
49 pub(super) security_state: SecurityValidationState,
51 pub(super) network_monitor: NetworkConditionMonitor,
53 pub(super) resource_manager: ResourceCleanupCoordinator,
55 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
57}
58#[derive(Debug, Clone)]
62pub(super) struct AddressCandidate {
63 pub(super) address: SocketAddr,
65 pub(super) priority: u32,
67 pub(super) source: CandidateSource,
69 pub(super) discovered_at: Instant,
71 pub(super) state: CandidateState,
73 pub(super) attempt_count: u32,
75 pub(super) last_attempt: Option<Instant>,
77}
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum CandidateSource {
81 Local,
83 Observed {
88 by_node: Option<VarInt>,
90 },
91 Peer,
93 Predicted,
95}
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum CandidateState {
99 New,
101 Validating,
103 Valid,
105 Failed,
107 Removed,
109}
110#[derive(Debug)]
112#[allow(dead_code)]
113pub(super) struct PathValidationState {
114 pub(super) challenge: u64,
116 pub(super) sent_at: Instant,
118 pub(super) retry_count: u32,
120 pub(super) max_retries: u32,
122 pub(super) coordination_round: Option<VarInt>,
124 pub(super) timeout_state: AdaptiveTimeoutState,
126 pub(super) last_retry_at: Option<Instant>,
128}
129#[derive(Debug)]
131#[allow(dead_code)]
132pub(super) struct CoordinationState {
133 pub(super) round: VarInt,
135 pub(super) punch_targets: Vec<PunchTarget>,
137 pub(super) round_start: Instant,
139 pub(super) punch_start: Instant,
141 pub(super) round_duration: Duration,
143 pub(super) state: CoordinationPhase,
145 pub(super) punch_request_sent: bool,
147 pub(super) peer_punch_received: bool,
149 pub(super) retry_count: u32,
151 pub(super) max_retries: u32,
153 pub(super) timeout_state: AdaptiveTimeoutState,
155 pub(super) last_retry_at: Option<Instant>,
157}
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160#[allow(dead_code)]
161pub(crate) enum CoordinationPhase {
162 Idle,
164 Requesting,
166 Coordinating,
168 Preparing,
170 Punching,
172 Validating,
174 Succeeded,
176 Failed,
178}
179#[derive(Debug, Clone)]
181pub(super) struct PunchTarget {
182 pub(super) remote_addr: SocketAddr,
184 pub(super) remote_sequence: VarInt,
186 pub(super) challenge: u64,
188}
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub(super) enum TimeoutAction {
192 RetryDiscovery,
194 RetryCoordination,
196 StartValidation,
198 Complete,
200 Failed,
202}
203
204#[derive(Debug, Clone)]
206#[allow(dead_code)]
207pub(super) struct CandidatePair {
208 pub(super) remote_sequence: VarInt,
210 pub(super) local_addr: SocketAddr,
212 pub(super) remote_addr: SocketAddr,
214 pub(super) priority: u64,
216 pub(super) state: PairState,
218 pub(super) pair_type: PairType,
220 pub(super) created_at: Instant,
222 pub(super) last_check: Option<Instant>,
224}
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227#[allow(dead_code)]
228pub(super) enum PairState {
229 Waiting,
231 Succeeded,
233 Failed,
235 Frozen,
237}
238#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
240pub(super) enum PairType {
241 HostToHost,
243 HostToServerReflexive,
245 ServerReflexiveToHost,
247 ServerReflexiveToServerReflexive,
249 PeerReflexive,
251}
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub(super) enum CandidateType {
255 Host,
257 ServerReflexive,
259 PeerReflexive,
261}
262
263#[allow(dead_code)]
266fn calculate_candidate_priority(
267 candidate_type: CandidateType,
268 local_preference: u16,
269 component_id: u8,
270) -> u32 {
271 let type_preference = match candidate_type {
272 CandidateType::Host => 126,
273 CandidateType::PeerReflexive => 110,
274 CandidateType::ServerReflexive => 100,
275 };
276 (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
278}
279
280fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
283 let g = local_priority as u64;
284 let d = remote_priority as u64;
285 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
287}
288
289fn classify_candidate_type(source: CandidateSource) -> CandidateType {
291 match source {
292 CandidateSource::Local => CandidateType::Host,
293 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
294 CandidateSource::Peer => CandidateType::PeerReflexive,
295 CandidateSource::Predicted => CandidateType::ServerReflexive, }
297}
298fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
300 match (local_type, remote_type) {
301 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
302 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
303 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
304 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
305 PairType::ServerReflexiveToServerReflexive
306 }
307 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
308 PairType::PeerReflexive
309 }
310 }
311}
312fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
314 match (local.address, remote.address) {
316 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
317 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
318 _ => false, }
320}
321#[derive(Debug, Default, Clone)]
323#[allow(dead_code)]
324pub(crate) struct NatTraversalStats {
325 pub(super) remote_candidates_received: u32,
327 pub(super) local_candidates_sent: u32,
329 pub(super) validations_succeeded: u32,
331 pub(super) validations_failed: u32,
333 pub(super) coordination_rounds: u32,
335 pub(super) successful_coordinations: u32,
337 pub(super) failed_coordinations: u32,
339 pub(super) timed_out_coordinations: u32,
341 pub(super) coordination_failures: u32,
343 pub(super) direct_connections: u32,
345 pub(super) security_rejections: u32,
347 pub(super) rate_limit_violations: u32,
349 pub(super) invalid_address_rejections: u32,
351 pub(super) suspicious_coordination_attempts: u32,
353 pub(super) callback_probes_received: u32,
355 pub(super) callback_probes_successful: u32,
357 pub(super) callback_probes_failed: u32,
359}
360#[derive(Debug)]
362#[allow(dead_code)]
363pub(super) struct SecurityValidationState {
364 candidate_rate_tracker: VecDeque<Instant>,
366 max_candidates_per_window: u32,
368 rate_window: Duration,
370 coordination_requests: VecDeque<CoordinationRequest>,
372 max_coordination_per_window: u32,
374 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
376 validation_cache_timeout: Duration,
378}
379#[derive(Debug, Clone)]
381struct CoordinationRequest {
382 timestamp: Instant,
384}
385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387enum AddressValidationResult {
388 Valid,
390 Invalid,
392 Suspicious,
394}
395#[derive(Debug, Clone)]
397pub(super) struct AdaptiveTimeoutState {
398 current_timeout: Duration,
400 min_timeout: Duration,
402 max_timeout: Duration,
404 base_timeout: Duration,
406 backoff_multiplier: f64,
408 max_backoff_multiplier: f64,
410 jitter_factor: f64,
412 srtt: Option<Duration>,
414 rttvar: Option<Duration>,
416 last_rtt: Option<Duration>,
418 consecutive_timeouts: u32,
420 successful_responses: u32,
422}
423#[derive(Debug)]
425#[allow(dead_code)]
426pub(super) struct NetworkConditionMonitor {
427 rtt_samples: VecDeque<Duration>,
429 max_samples: usize,
431 packet_loss_rate: f64,
433 congestion_window: u32,
435 quality_score: f64,
437 last_quality_update: Instant,
439 quality_update_interval: Duration,
441 timeout_stats: TimeoutStatistics,
443}
444#[derive(Debug, Default)]
446struct TimeoutStatistics {
447 total_timeouts: u64,
449 total_responses: u64,
451 avg_response_time: Duration,
453 timeout_rate: f64,
455 last_update: Option<Instant>,
457}
458#[allow(dead_code)]
459impl SecurityValidationState {
460 fn new() -> Self {
462 Self {
463 candidate_rate_tracker: VecDeque::new(),
464 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
466 coordination_requests: VecDeque::new(),
467 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
469 validation_cache_timeout: Duration::from_secs(300), }
471 }
472 fn new_with_limits(
474 max_candidates_per_window: u32,
475 max_coordination_per_window: u32,
476 rate_window: Duration,
477 ) -> Self {
478 Self {
479 candidate_rate_tracker: VecDeque::new(),
480 max_candidates_per_window,
481 rate_window,
482 coordination_requests: VecDeque::new(),
483 max_coordination_per_window,
484 address_validation_cache: HashMap::new(),
485 validation_cache_timeout: Duration::from_secs(300),
486 }
487 }
488 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
493 self.cleanup_rate_tracker(now);
495 self.cleanup_coordination_tracker(now);
496 let _current_candidate_rate =
498 self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
499 let _current_coordination_rate =
500 self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
501
502 let peer_reputation = self.calculate_peer_reputation(peer_id);
504 let adaptive_candidate_limit =
505 (self.max_candidates_per_window as f64 * peer_reputation) as u32;
506 let adaptive_coordination_limit =
507 (self.max_coordination_per_window as f64 * peer_reputation) as u32;
508
509 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
511 debug!(
512 "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
513 hex::encode(&peer_id[..8]),
514 self.candidate_rate_tracker.len(),
515 adaptive_candidate_limit
516 );
517 return true;
518 }
519
520 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
521 debug!(
522 "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
523 hex::encode(&peer_id[..8]),
524 self.coordination_requests.len(),
525 adaptive_coordination_limit
526 );
527 return true;
528 }
529
530 false
531 }
532
533 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
538 1.0
547 }
548
549 fn validate_amplification_limits(
554 &mut self,
555 source_addr: SocketAddr,
556 target_addr: SocketAddr,
557 now: Instant,
558 ) -> Result<(), NatTraversalError> {
559 let amplification_key = (source_addr, target_addr);
561 if self.is_amplification_suspicious(amplification_key, now) {
570 warn!(
571 "Potential amplification attack detected: {} -> {}",
572 source_addr, target_addr
573 );
574 return Err(NatTraversalError::SuspiciousCoordination);
575 }
576
577 Ok(())
578 }
579
580 fn is_amplification_suspicious(
582 &self,
583 _amplification_key: (SocketAddr, SocketAddr),
584 _now: Instant,
585 ) -> bool {
586 false
595 }
596
597 fn generate_secure_coordination_round(&self) -> VarInt {
602 let secure_random: u64 = rand::random();
604 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
608 }
609
610 fn enhanced_address_validation(
618 &mut self,
619 addr: SocketAddr,
620 source_addr: SocketAddr,
621 now: Instant,
622 ) -> Result<AddressValidationResult, NatTraversalError> {
623 let basic_result = self.validate_address(addr, now);
625 match basic_result {
626 AddressValidationResult::Invalid => {
627 return Err(NatTraversalError::InvalidAddress);
628 }
629 AddressValidationResult::Suspicious => {
630 return Err(NatTraversalError::SuspiciousCoordination);
631 }
632 AddressValidationResult::Valid => {
633 }
635 }
636
637 self.validate_amplification_limits(source_addr, addr, now)?;
639
640 if self.is_address_in_suspicious_range(addr) {
642 warn!("Address in suspicious range detected: {}", addr);
643 return Err(NatTraversalError::SuspiciousCoordination);
644 }
645
646 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
647 warn!(
648 "Suspicious coordination pattern detected: {} -> {}",
649 source_addr, addr
650 );
651 return Err(NatTraversalError::SuspiciousCoordination);
652 }
653
654 Ok(AddressValidationResult::Valid)
655 }
656
657 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
659 match addr.ip() {
660 IpAddr::V4(ipv4) => {
661 let octets = ipv4.octets();
663 if octets[0] == 0 || octets[0] == 127 {
665 return true;
666 }
667
668 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
670 return true;
671 }
672 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
673 return true;
674 }
675 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
676 return true;
677 }
678
679 false
680 }
681 IpAddr::V6(ipv6) => {
682 if ipv6.is_loopback() || ipv6.is_unspecified() {
684 return true;
685 }
686
687 let segments = ipv6.segments();
689 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
690 return true;
691 }
692
693 false
694 }
695 }
696 }
697
698 fn is_coordination_pattern_suspicious(
700 &self,
701 _source_addr: SocketAddr,
702 _target_addr: SocketAddr,
703 _now: Instant,
704 ) -> bool {
705 false
714 }
715
716 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
718 self.cleanup_rate_tracker(now);
720 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
722 return true;
723 }
724
725 self.candidate_rate_tracker.push_back(now);
727 false
728 }
729
730 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
732 self.cleanup_coordination_tracker(now);
734 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
736 return true;
737 }
738
739 let request = CoordinationRequest { timestamp: now };
741 self.coordination_requests.push_back(request);
742 false
743 }
744
745 fn cleanup_rate_tracker(&mut self, now: Instant) {
747 let cutoff = now - self.rate_window;
748 while let Some(&front_time) = self.candidate_rate_tracker.front() {
749 if front_time < cutoff {
750 self.candidate_rate_tracker.pop_front();
751 } else {
752 break;
753 }
754 }
755 }
756 fn cleanup_coordination_tracker(&mut self, now: Instant) {
758 let cutoff = now - self.rate_window;
759 while let Some(front_request) = self.coordination_requests.front() {
760 if front_request.timestamp < cutoff {
761 self.coordination_requests.pop_front();
762 } else {
763 break;
764 }
765 }
766 }
767 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
769 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
771 return cached_result;
772 }
773 let result = self.perform_address_validation(addr);
774
775 self.address_validation_cache.insert(addr, result);
777
778 if self.address_validation_cache.len() > 1000 {
780 self.cleanup_address_cache(now);
781 }
782
783 result
784 }
785
786 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
788 match addr.ip() {
789 IpAddr::V4(ipv4) => {
790 if ipv4.is_unspecified() || ipv4.is_broadcast() {
792 return AddressValidationResult::Invalid;
793 }
794 if ipv4.is_multicast() || ipv4.is_documentation() {
796 return AddressValidationResult::Suspicious;
797 }
798
799 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
801 return AddressValidationResult::Invalid;
802 }
803
804 if self.is_suspicious_ipv4(ipv4) {
806 return AddressValidationResult::Suspicious;
807 }
808 }
809 IpAddr::V6(ipv6) => {
810 if ipv6.is_unspecified() || ipv6.is_multicast() {
812 return AddressValidationResult::Invalid;
813 }
814
815 if self.is_suspicious_ipv6(ipv6) {
817 return AddressValidationResult::Suspicious;
818 }
819 }
820 }
821
822 if addr.port() == 0 || addr.port() < 1024 {
824 return AddressValidationResult::Suspicious;
825 }
826
827 AddressValidationResult::Valid
828 }
829
830 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
832 let octets = ipv4.octets();
833 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
836 return true;
837 }
838
839 false
842 }
843
844 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
846 let segments = ipv6.segments();
847 if segments.iter().all(|&s| s == segments[0]) {
849 return true;
850 }
851
852 false
853 }
854
855 fn cleanup_address_cache(&mut self, _now: Instant) {
857 if self.address_validation_cache.len() > 500 {
860 let keys_to_remove: Vec<_> = self
861 .address_validation_cache
862 .keys()
863 .take(self.address_validation_cache.len() / 2)
864 .copied()
865 .collect();
866 for key in keys_to_remove {
867 self.address_validation_cache.remove(&key);
868 }
869 }
870 }
871
872 fn validate_punch_me_now_frame(
880 &mut self,
881 frame: &crate::frame::PunchMeNow,
882 source_addr: SocketAddr,
883 peer_id: [u8; 32],
884 now: Instant,
885 ) -> Result<(), NatTraversalError> {
886 if self.is_coordination_rate_limited(now) {
888 debug!(
889 "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
890 hex::encode(&peer_id[..8])
891 );
892 return Err(NatTraversalError::RateLimitExceeded);
893 }
894 let addr_validation = self.validate_address(frame.address, now);
896 match addr_validation {
897 AddressValidationResult::Invalid => {
898 debug!(
899 "PUNCH_ME_NOW frame rejected: invalid address {:?} from peer {:?}",
900 frame.address,
901 hex::encode(&peer_id[..8])
902 );
903 return Err(NatTraversalError::InvalidAddress);
904 }
905 AddressValidationResult::Suspicious => {
906 debug!(
907 "PUNCH_ME_NOW frame rejected: suspicious address {:?} from peer {:?}",
908 frame.address,
909 hex::encode(&peer_id[..8])
910 );
911 return Err(NatTraversalError::SuspiciousCoordination);
912 }
913 AddressValidationResult::Valid => {
914 }
916 }
917
918 if !self.validate_address_consistency(frame.address, source_addr) {
921 debug!(
922 "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
923 frame.address, source_addr
924 );
925 return Err(NatTraversalError::SuspiciousCoordination);
926 }
927
928 if !self.validate_coordination_parameters(frame) {
930 debug!(
931 "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
932 hex::encode(&peer_id[..8])
933 );
934 return Err(NatTraversalError::SuspiciousCoordination);
935 }
936
937 if let Some(target_peer_id) = frame.target_peer_id {
939 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
940 debug!(
941 "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
942 hex::encode(&peer_id[..8]),
943 hex::encode(&target_peer_id[..8])
944 );
945 return Err(NatTraversalError::SuspiciousCoordination);
946 }
947 }
948
949 if !self.validate_resource_limits(frame) {
951 debug!(
952 "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
953 hex::encode(&peer_id[..8])
954 );
955 return Err(NatTraversalError::ResourceLimitExceeded);
956 }
957
958 debug!(
959 "PUNCH_ME_NOW frame validation passed for peer {:?}",
960 hex::encode(&peer_id[..8])
961 );
962 Ok(())
963 }
964
965 fn validate_address_consistency(
970 &self,
971 claimed_addr: SocketAddr,
972 observed_addr: SocketAddr,
973 ) -> bool {
974 match (claimed_addr.ip(), observed_addr.ip()) {
978 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
979 if claimed_ip == observed_ip {
981 return true;
982 }
983
984 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
986 return true;
987 }
988
989 !claimed_ip.is_private() && !observed_ip.is_private()
992 }
993 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
994 claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
996 }
997 _ => {
998 false
1000 }
1001 }
1002 }
1003
1004 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1006 let ip1_octets = ip1.octets();
1008 let ip2_octets = ip2.octets();
1009 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1011 return true;
1012 }
1013
1014 if ip1_octets[0] == 172
1016 && ip2_octets[0] == 172
1017 && (16..=31).contains(&ip1_octets[1])
1018 && (16..=31).contains(&ip2_octets[1])
1019 {
1020 return true;
1021 }
1022
1023 if ip1_octets[0] == 192
1025 && ip1_octets[1] == 168
1026 && ip2_octets[0] == 192
1027 && ip2_octets[1] == 168
1028 {
1029 return true;
1030 }
1031
1032 false
1033 }
1034
1035 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1037 let segments1 = ip1.segments();
1039 let segments2 = ip2.segments();
1040 segments1[0] == segments2[0]
1041 && segments1[1] == segments2[1]
1042 && segments1[2] == segments2[2]
1043 && segments1[3] == segments2[3]
1044 }
1045
1046 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1048 if frame.round.into_inner() > 1000000 {
1050 return false;
1051 }
1052 if frame.paired_with_sequence_number.into_inner() > 10000 {
1054 return false;
1055 }
1056
1057 match frame.address.ip() {
1059 IpAddr::V4(ipv4) => {
1060 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1062 }
1063 IpAddr::V6(ipv6) => {
1064 !ipv6.is_unspecified() && !ipv6.is_multicast()
1066 }
1067 }
1068 }
1069
1070 fn validate_target_peer_request(
1072 &self,
1073 requesting_peer: [u8; 32],
1074 target_peer: [u8; 32],
1075 _frame: &crate::frame::PunchMeNow,
1076 ) -> bool {
1077 if requesting_peer == target_peer {
1079 return false;
1080 }
1081 true
1087 }
1088
1089 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1091 self.coordination_requests.len() < self.max_coordination_per_window as usize
1099 }
1100}
1101
1102impl AdaptiveTimeoutState {
1103 pub(crate) fn new() -> Self {
1105 let base_timeout = Duration::from_millis(1000); Self {
1107 current_timeout: base_timeout,
1108 min_timeout: Duration::from_millis(100),
1109 max_timeout: Duration::from_secs(30),
1110 base_timeout,
1111 backoff_multiplier: 1.0,
1112 max_backoff_multiplier: 8.0,
1113 jitter_factor: 0.1, srtt: None,
1115 rttvar: None,
1116 last_rtt: None,
1117 consecutive_timeouts: 0,
1118 successful_responses: 0,
1119 }
1120 }
1121 fn update_success(&mut self, rtt: Duration) {
1123 self.last_rtt = Some(rtt);
1124 self.successful_responses += 1;
1125 self.consecutive_timeouts = 0;
1126 match self.srtt {
1128 None => {
1129 self.srtt = Some(rtt);
1130 self.rttvar = Some(rtt / 2);
1131 }
1132 Some(srtt) => {
1133 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1134 let abs_diff = rtt.abs_diff(srtt);
1135
1136 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1137 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1138 }
1139 }
1140
1141 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1143
1144 self.calculate_current_timeout();
1146 }
1147
1148 fn update_timeout(&mut self) {
1150 self.consecutive_timeouts += 1;
1151 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1153
1154 self.calculate_current_timeout();
1156 }
1157
1158 fn calculate_current_timeout(&mut self) {
1160 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1161 srtt + rttvar * 4
1163 } else {
1164 self.base_timeout
1165 };
1166 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1168
1169 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1171 let timeout = timeout.mul_f64(jitter);
1172
1173 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1175 }
1176
1177 fn get_timeout(&self) -> Duration {
1179 self.current_timeout
1180 }
1181 fn should_retry(&self, max_retries: u32) -> bool {
1183 self.consecutive_timeouts < max_retries
1184 }
1185 fn get_retry_delay(&self) -> Duration {
1187 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1188 delay.clamp(self.min_timeout, self.max_timeout)
1189 }
1190}
1191#[derive(Debug)]
1193#[allow(dead_code)]
1194pub(super) struct ResourceManagementConfig {
1195 max_active_validations: usize,
1197 max_local_candidates: usize,
1199 max_remote_candidates: usize,
1201 max_candidate_pairs: usize,
1203 max_coordination_history: usize,
1205 cleanup_interval: Duration,
1207 candidate_timeout: Duration,
1209 validation_timeout: Duration,
1211 coordination_timeout: Duration,
1213 memory_pressure_threshold: f64,
1215 aggressive_cleanup_threshold: f64,
1217}
1218#[derive(Debug, Default)]
1220#[allow(dead_code)]
1221pub(super) struct ResourceStats {
1222 active_validations: usize,
1224 local_candidates: usize,
1226 remote_candidates: usize,
1228 candidate_pairs: usize,
1230 peak_memory_usage: usize,
1232 cleanup_operations: u64,
1234 resources_cleaned: u64,
1236 allocation_failures: u64,
1238 last_cleanup: Option<Instant>,
1240 memory_pressure: f64,
1242}
1243#[derive(Debug)]
1245pub(super) struct ResourceCleanupCoordinator {
1246 config: ResourceManagementConfig,
1248 stats: ResourceStats,
1250 last_cleanup: Option<Instant>,
1252 cleanup_counter: u64,
1254 shutdown_requested: bool,
1256}
1257impl ResourceManagementConfig {
1258 fn new() -> Self {
1260 Self {
1261 max_active_validations: 100,
1262 max_local_candidates: 50,
1263 max_remote_candidates: 100,
1264 max_candidate_pairs: 200,
1265 max_coordination_history: 10,
1266 cleanup_interval: Duration::from_secs(30),
1267 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1269 coordination_timeout: Duration::from_secs(60),
1270 memory_pressure_threshold: 0.75,
1271 aggressive_cleanup_threshold: 0.90,
1272 }
1273 }
1274 #[cfg(feature = "low_memory")]
1276 fn low_memory() -> Self {
1277 Self {
1278 max_active_validations: 25,
1279 max_local_candidates: 10,
1280 max_remote_candidates: 25,
1281 max_candidate_pairs: 50,
1282 max_coordination_history: 3,
1283 cleanup_interval: Duration::from_secs(15),
1284 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1286 coordination_timeout: Duration::from_secs(30),
1287 memory_pressure_threshold: 0.60,
1288 aggressive_cleanup_threshold: 0.80,
1289 }
1290 }
1291}
1292#[allow(dead_code)]
1293impl ResourceCleanupCoordinator {
1294 fn new() -> Self {
1296 Self {
1297 config: ResourceManagementConfig::new(),
1298 stats: ResourceStats::default(),
1299 last_cleanup: None,
1300 cleanup_counter: 0,
1301 shutdown_requested: false,
1302 }
1303 }
1304 #[cfg(feature = "low_memory")]
1306 fn low_memory() -> Self {
1307 Self {
1308 config: ResourceManagementConfig::low_memory(),
1309 stats: ResourceStats::default(),
1310 last_cleanup: None,
1311 cleanup_counter: 0,
1312 shutdown_requested: false,
1313 }
1314 }
1315 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1317 state.active_validations.len() > self.config.max_active_validations
1318 || state.local_candidates.len() > self.config.max_local_candidates
1319 || state.remote_candidates.len() > self.config.max_remote_candidates
1320 || state.candidate_pairs.len() > self.config.max_candidate_pairs
1321 }
1322 fn calculate_memory_pressure(
1324 &mut self,
1325 active_validations_len: usize,
1326 local_candidates_len: usize,
1327 remote_candidates_len: usize,
1328 candidate_pairs_len: usize,
1329 ) -> f64 {
1330 let total_limit = self.config.max_active_validations
1331 + self.config.max_local_candidates
1332 + self.config.max_remote_candidates
1333 + self.config.max_candidate_pairs;
1334 let current_usage = active_validations_len
1335 + local_candidates_len
1336 + remote_candidates_len
1337 + candidate_pairs_len;
1338
1339 let pressure = current_usage as f64 / total_limit as f64;
1340 self.stats.memory_pressure = pressure;
1341 pressure
1342 }
1343
1344 fn should_cleanup(&self, now: Instant) -> bool {
1346 if self.shutdown_requested {
1347 return true;
1348 }
1349 if let Some(last_cleanup) = self.last_cleanup {
1351 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1352 return true;
1353 }
1354 } else {
1355 return true; }
1357
1358 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1360 return true;
1361 }
1362
1363 false
1364 }
1365
1366 fn cleanup_expired_resources(
1368 &mut self,
1369 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1370 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1371 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1372 candidate_pairs: &mut Vec<CandidatePair>,
1373 coordination: &mut Option<CoordinationState>,
1374 now: Instant,
1375 ) -> u64 {
1376 let mut cleaned = 0;
1377 cleaned += self.cleanup_expired_validations(active_validations, now);
1379
1380 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1382
1383 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1385
1386 cleaned += self.cleanup_old_coordination(coordination, now);
1388
1389 self.stats.cleanup_operations += 1;
1391 self.stats.resources_cleaned += cleaned;
1392 self.last_cleanup = Some(now);
1393 self.cleanup_counter += 1;
1394
1395 debug!("Cleaned up {} expired resources", cleaned);
1396 cleaned
1397 }
1398
1399 fn cleanup_expired_validations(
1401 &mut self,
1402 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1403 now: Instant,
1404 ) -> u64 {
1405 let mut cleaned = 0;
1406 let validation_timeout = self.config.validation_timeout;
1407 active_validations.retain(|_addr, validation| {
1408 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1409 if is_expired {
1410 cleaned += 1;
1411 trace!("Cleaned up expired validation for {:?}", _addr);
1412 }
1413 !is_expired
1414 });
1415
1416 cleaned
1417 }
1418
1419 fn cleanup_stale_candidates(
1421 &mut self,
1422 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1423 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1424 now: Instant,
1425 ) -> u64 {
1426 let mut cleaned = 0;
1427 let candidate_timeout = self.config.candidate_timeout;
1428 local_candidates.retain(|_seq, candidate| {
1430 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1431 || candidate.state == CandidateState::Failed
1432 || candidate.state == CandidateState::Removed;
1433 if is_stale {
1434 cleaned += 1;
1435 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1436 }
1437 !is_stale
1438 });
1439
1440 remote_candidates.retain(|_seq, candidate| {
1442 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1443 || candidate.state == CandidateState::Failed
1444 || candidate.state == CandidateState::Removed;
1445 if is_stale {
1446 cleaned += 1;
1447 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1448 }
1449 !is_stale
1450 });
1451
1452 cleaned
1453 }
1454
1455 fn cleanup_failed_pairs(
1457 &mut self,
1458 candidate_pairs: &mut Vec<CandidatePair>,
1459 now: Instant,
1460 ) -> u64 {
1461 let mut cleaned = 0;
1462 let pair_timeout = self.config.candidate_timeout;
1463 candidate_pairs.retain(|pair| {
1464 let is_stale = now.duration_since(pair.created_at) > pair_timeout
1465 || pair.state == PairState::Failed;
1466 if is_stale {
1467 cleaned += 1;
1468 trace!(
1469 "Cleaned up failed candidate pair {:?} -> {:?}",
1470 pair.local_addr, pair.remote_addr
1471 );
1472 }
1473 !is_stale
1474 });
1475
1476 cleaned
1477 }
1478
1479 fn cleanup_old_coordination(
1481 &mut self,
1482 coordination: &mut Option<CoordinationState>,
1483 now: Instant,
1484 ) -> u64 {
1485 let mut cleaned = 0;
1486 if let Some(coord) = coordination {
1487 let is_expired =
1488 now.duration_since(coord.round_start) > self.config.coordination_timeout;
1489 let is_failed = coord.state == CoordinationPhase::Failed;
1490
1491 if is_expired || is_failed {
1492 let round = coord.round;
1493 *coordination = None;
1494 cleaned += 1;
1495 trace!("Cleaned up old coordination state for round {}", round);
1496 }
1497 }
1498
1499 cleaned
1500 }
1501
1502 fn aggressive_cleanup(
1504 &mut self,
1505 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1506 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1507 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1508 candidate_pairs: &mut Vec<CandidatePair>,
1509 now: Instant,
1510 ) -> u64 {
1511 let mut cleaned = 0;
1512 let aggressive_timeout = self.config.candidate_timeout / 2;
1514
1515 local_candidates.retain(|_seq, candidate| {
1517 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1518 && candidate.state != CandidateState::Failed;
1519 if !keep {
1520 cleaned += 1;
1521 }
1522 keep
1523 });
1524
1525 remote_candidates.retain(|_seq, candidate| {
1526 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1527 && candidate.state != CandidateState::Failed;
1528 if !keep {
1529 cleaned += 1;
1530 }
1531 keep
1532 });
1533
1534 candidate_pairs.retain(|pair| {
1536 let keep = pair.state != PairState::Waiting
1537 || now.duration_since(pair.created_at) <= aggressive_timeout;
1538 if !keep {
1539 cleaned += 1;
1540 }
1541 keep
1542 });
1543
1544 active_validations.retain(|_addr, validation| {
1546 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1547 if !keep {
1548 cleaned += 1;
1549 }
1550 keep
1551 });
1552
1553 warn!(
1554 "Aggressive cleanup removed {} resources due to memory pressure",
1555 cleaned
1556 );
1557 cleaned
1558 }
1559
1560 fn request_shutdown(&mut self) {
1562 self.shutdown_requested = true;
1563 debug!("Resource cleanup coordinator shutdown requested");
1564 }
1565 fn shutdown_cleanup(
1567 &mut self,
1568 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1569 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1570 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1571 candidate_pairs: &mut Vec<CandidatePair>,
1572 coordination: &mut Option<CoordinationState>,
1573 ) -> u64 {
1574 let mut cleaned = 0;
1575 cleaned += active_validations.len() as u64;
1577 active_validations.clear();
1578
1579 cleaned += local_candidates.len() as u64;
1580 local_candidates.clear();
1581
1582 cleaned += remote_candidates.len() as u64;
1583 remote_candidates.clear();
1584
1585 cleaned += candidate_pairs.len() as u64;
1586 candidate_pairs.clear();
1587
1588 if coordination.is_some() {
1589 *coordination = None;
1590 cleaned += 1;
1591 }
1592
1593 info!("Shutdown cleanup removed {} resources", cleaned);
1594 cleaned
1595 }
1596
1597 fn get_resource_stats(&self) -> &ResourceStats {
1599 &self.stats
1600 }
1601 fn update_stats(
1603 &mut self,
1604 active_validations_len: usize,
1605 local_candidates_len: usize,
1606 remote_candidates_len: usize,
1607 candidate_pairs_len: usize,
1608 ) {
1609 self.stats.active_validations = active_validations_len;
1610 self.stats.local_candidates = local_candidates_len;
1611 self.stats.remote_candidates = remote_candidates_len;
1612 self.stats.candidate_pairs = candidate_pairs_len;
1613 let current_usage = self.stats.active_validations
1615 + self.stats.local_candidates
1616 + self.stats.remote_candidates
1617 + self.stats.candidate_pairs;
1618
1619 if current_usage > self.stats.peak_memory_usage {
1620 self.stats.peak_memory_usage = current_usage;
1621 }
1622 }
1623
1624 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1626 self.last_cleanup = Some(now);
1627 self.cleanup_counter += 1;
1628 self.stats.cleanup_operations += 1;
1630
1631 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1632 }
1633}
1634
1635#[allow(dead_code)]
1636impl NetworkConditionMonitor {
1637 fn new() -> Self {
1639 Self {
1640 rtt_samples: VecDeque::new(),
1641 max_samples: 20,
1642 packet_loss_rate: 0.0,
1643 congestion_window: 10,
1644 quality_score: 0.8, last_quality_update: Instant::now(),
1646 quality_update_interval: Duration::from_secs(10),
1647 timeout_stats: TimeoutStatistics::default(),
1648 }
1649 }
1650 fn record_success(&mut self, rtt: Duration, now: Instant) {
1652 self.rtt_samples.push_back(rtt);
1654 if self.rtt_samples.len() > self.max_samples {
1655 self.rtt_samples.pop_front();
1656 }
1657 self.timeout_stats.total_responses += 1;
1659 self.update_timeout_stats(now);
1660
1661 self.update_quality_score(now);
1663 }
1664
1665 fn record_timeout(&mut self, now: Instant) {
1667 self.timeout_stats.total_timeouts += 1;
1668 self.update_timeout_stats(now);
1669 self.update_quality_score(now);
1671 }
1672
1673 fn update_timeout_stats(&mut self, now: Instant) {
1675 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1676 if total_attempts > 0 {
1677 self.timeout_stats.timeout_rate =
1678 self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1679 }
1680
1681 if !self.rtt_samples.is_empty() {
1683 let total_rtt: Duration = self.rtt_samples.iter().sum();
1684 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1685 }
1686
1687 self.timeout_stats.last_update = Some(now);
1688 }
1689
1690 fn update_quality_score(&mut self, now: Instant) {
1692 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1693 return;
1694 }
1695 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1697 let rtt_factor = self.calculate_rtt_factor();
1698 let consistency_factor = self.calculate_consistency_factor();
1699
1700 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1702
1703 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1705 self.last_quality_update = now;
1706 }
1707
1708 fn calculate_rtt_factor(&self) -> f64 {
1710 if self.rtt_samples.is_empty() {
1711 return 0.5; }
1713 let avg_rtt = self.timeout_stats.avg_response_time;
1714
1715 let rtt_ms = avg_rtt.as_millis() as f64;
1717 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1718 factor.clamp(0.0, 1.0)
1719 }
1720
1721 fn calculate_consistency_factor(&self) -> f64 {
1723 if self.rtt_samples.len() < 3 {
1724 return 0.5; }
1726 let mean_rtt = self.timeout_stats.avg_response_time;
1728 let variance: f64 = self
1729 .rtt_samples
1730 .iter()
1731 .map(|rtt| {
1732 let diff = (*rtt).abs_diff(mean_rtt);
1733 diff.as_millis() as f64
1734 })
1735 .map(|diff| diff * diff)
1736 .sum::<f64>()
1737 / self.rtt_samples.len() as f64;
1738
1739 let std_dev = variance.sqrt();
1740
1741 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1743 consistency.clamp(0.0, 1.0)
1744 }
1745
1746 fn get_quality_score(&self) -> f64 {
1748 self.quality_score
1749 }
1750 fn get_estimated_rtt(&self) -> Option<Duration> {
1752 if self.rtt_samples.is_empty() {
1753 return None;
1754 }
1755 Some(self.timeout_stats.avg_response_time)
1756 }
1757
1758 fn is_suitable_for_coordination(&self) -> bool {
1760 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1762 }
1763 fn get_packet_loss_rate(&self) -> f64 {
1765 self.packet_loss_rate
1766 }
1767
1768 fn get_timeout_multiplier(&self) -> f64 {
1770 let base_multiplier = 1.0;
1771
1772 let quality_multiplier = if self.quality_score < 0.3 {
1774 2.0 } else if self.quality_score > 0.8 {
1776 0.8 } else {
1778 1.0 };
1780
1781 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1783
1784 base_multiplier * quality_multiplier * loss_multiplier
1785 }
1786
1787 fn cleanup(&mut self, now: Instant) {
1789 let _cutoff_time = now - Duration::from_secs(60);
1791
1792 if let Some(last_update) = self.timeout_stats.last_update {
1794 if now.duration_since(last_update) > Duration::from_secs(300) {
1795 self.timeout_stats = TimeoutStatistics::default();
1796 }
1797 }
1798 }
1799}
1800
1801#[allow(dead_code)]
1802impl NatTraversalState {
1803 pub(super) fn new(max_candidates: u32, coordination_timeout: Duration) -> Self {
1808 let bootstrap_coordinator = Some(BootstrapCoordinator::new(BootstrapConfig::default()));
1810
1811 Self {
1812 local_candidates: HashMap::new(),
1814 remote_candidates: HashMap::new(),
1815 candidate_pairs: Vec::new(),
1816 pair_index: HashMap::new(),
1817 active_validations: HashMap::new(),
1818 coordination: None,
1819 next_sequence: VarInt::from_u32(1),
1820 max_candidates,
1821 coordination_timeout,
1822 stats: NatTraversalStats::default(),
1823 security_state: SecurityValidationState::new(),
1824 network_monitor: NetworkConditionMonitor::new(),
1825 resource_manager: ResourceCleanupCoordinator::new(),
1826 bootstrap_coordinator,
1827 }
1828 }
1829
1830 pub(super) fn add_remote_candidate(
1832 &mut self,
1833 sequence: VarInt,
1834 address: SocketAddr,
1835 priority: VarInt,
1836 now: Instant,
1837 ) -> Result<(), NatTraversalError> {
1838 if self.should_reject_new_resources(now) {
1840 debug!(
1841 "Rejecting new candidate due to resource limits: {}",
1842 address
1843 );
1844 return Err(NatTraversalError::ResourceLimitExceeded);
1845 }
1846 if self.security_state.is_candidate_rate_limited(now) {
1848 self.stats.rate_limit_violations += 1;
1849 debug!("Rate limit exceeded for candidate addition: {}", address);
1850 return Err(NatTraversalError::RateLimitExceeded);
1851 }
1852
1853 match self.security_state.validate_address(address, now) {
1855 AddressValidationResult::Invalid => {
1856 self.stats.invalid_address_rejections += 1;
1857 self.stats.security_rejections += 1;
1858 debug!("Invalid address rejected: {}", address);
1859 return Err(NatTraversalError::InvalidAddress);
1860 }
1861 AddressValidationResult::Suspicious => {
1862 self.stats.security_rejections += 1;
1863 debug!("Suspicious address rejected: {}", address);
1864 return Err(NatTraversalError::SecurityValidationFailed);
1865 }
1866 AddressValidationResult::Valid => {
1867 }
1869 }
1870
1871 if self.remote_candidates.len() >= self.max_candidates as usize {
1873 return Err(NatTraversalError::TooManyCandidates);
1874 }
1875
1876 if self
1878 .remote_candidates
1879 .values()
1880 .any(|c| c.address == address && c.state != CandidateState::Removed)
1881 {
1882 return Err(NatTraversalError::DuplicateAddress);
1883 }
1884
1885 let candidate = AddressCandidate {
1886 address,
1887 priority: priority.into_inner() as u32,
1888 source: CandidateSource::Peer,
1889 discovered_at: now,
1890 state: CandidateState::New,
1891 attempt_count: 0,
1892 last_attempt: None,
1893 };
1894
1895 self.remote_candidates.insert(sequence, candidate);
1896 self.stats.remote_candidates_received += 1;
1897
1898 trace!(
1899 "Added remote candidate: {} with priority {}",
1900 address, priority
1901 );
1902 Ok(())
1903 }
1904
1905 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1907 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1908 candidate.state = CandidateState::Removed;
1909 self.active_validations.remove(&candidate.address);
1911 true
1912 } else {
1913 false
1914 }
1915 }
1916
1917 #[allow(clippy::expect_used)]
1919 pub(super) fn add_local_candidate(
1920 &mut self,
1921 address: SocketAddr,
1922 source: CandidateSource,
1923 now: Instant,
1924 ) -> VarInt {
1925 let sequence = self.next_sequence;
1926 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1927 .expect("sequence number overflow");
1928 let candidate_type = classify_candidate_type(source);
1930 let local_preference = self.calculate_local_preference(address);
1931 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1932
1933 let candidate = AddressCandidate {
1934 address,
1935 priority,
1936 source,
1937 discovered_at: now,
1938 state: CandidateState::New,
1939 attempt_count: 0,
1940 last_attempt: None,
1941 };
1942
1943 self.local_candidates.insert(sequence, candidate);
1944 self.stats.local_candidates_sent += 1;
1945
1946 self.generate_candidate_pairs(now);
1948
1949 sequence
1950 }
1951
1952 fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1954 match addr {
1955 SocketAddr::V4(v4) => {
1956 if v4.ip().is_loopback() {
1957 0 } else if v4.ip().is_private() {
1959 65000 } else {
1961 32000 }
1963 }
1964 SocketAddr::V6(v6) => {
1965 if v6.ip().is_loopback() {
1966 0
1967 } else if v6.ip().segments()[0] == 0xfe80 {
1968 30000 } else {
1971 50000 }
1973 }
1974 }
1975 }
1976 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1978 self.candidate_pairs.clear();
1979 self.pair_index.clear();
1980 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1982 self.candidate_pairs.reserve(estimated_capacity);
1983 self.pair_index.reserve(estimated_capacity);
1984
1985 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
1987
1988 for local_candidate in self.local_candidates.values() {
1989 if local_candidate.state == CandidateState::Removed {
1991 continue;
1992 }
1993
1994 let local_type = classify_candidate_type(local_candidate.source);
1996
1997 for (remote_seq, remote_candidate) in &self.remote_candidates {
1998 if remote_candidate.state == CandidateState::Removed {
2000 continue;
2001 }
2002
2003 let cache_key = (local_candidate.address, remote_candidate.address);
2005 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2006 are_candidates_compatible(local_candidate, remote_candidate)
2007 });
2008
2009 if !compatible {
2010 continue;
2011 }
2012
2013 let pair_priority =
2015 calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
2016
2017 let remote_type = classify_candidate_type(remote_candidate.source);
2019 let pair_type = classify_pair_type(local_type, remote_type);
2020
2021 let pair = CandidatePair {
2022 remote_sequence: *remote_seq,
2023 local_addr: local_candidate.address,
2024 remote_addr: remote_candidate.address,
2025 priority: pair_priority,
2026 state: PairState::Waiting,
2027 pair_type,
2028 created_at: now,
2029 last_check: None,
2030 };
2031
2032 let index = self.candidate_pairs.len();
2034 self.pair_index.insert(remote_candidate.address, index);
2035 self.candidate_pairs.push(pair);
2036 }
2037 }
2038
2039 self.candidate_pairs
2041 .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2042
2043 self.pair_index.clear();
2045 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2046 self.pair_index.insert(pair.remote_addr, idx);
2047 }
2048
2049 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2050 }
2051
2052 pub(super) fn get_next_validation_pairs(
2054 &mut self,
2055 max_concurrent: usize,
2056 ) -> Vec<&mut CandidatePair> {
2057 let mut result = Vec::with_capacity(max_concurrent);
2060 for pair in self.candidate_pairs.iter_mut() {
2061 if pair.state == PairState::Waiting {
2062 result.push(pair);
2063 if result.len() >= max_concurrent {
2064 break;
2065 }
2066 }
2067 }
2068
2069 result
2070 }
2071
2072 pub(super) fn find_pair_by_remote_addr(
2074 &mut self,
2075 addr: SocketAddr,
2076 ) -> Option<&mut CandidatePair> {
2077 if let Some(&index) = self.pair_index.get(&addr) {
2079 self.candidate_pairs.get_mut(index)
2080 } else {
2081 None
2082 }
2083 }
2084 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2086 let (succeeded_type, succeeded_priority) = {
2088 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2089 pair.state = PairState::Succeeded;
2090 (pair.pair_type, pair.priority)
2091 } else {
2092 return false;
2093 }
2094 };
2095 for other_pair in &mut self.candidate_pairs {
2097 if other_pair.pair_type == succeeded_type
2098 && other_pair.priority < succeeded_priority
2099 && other_pair.state == PairState::Waiting
2100 {
2101 other_pair.state = PairState::Frozen;
2102 }
2103 }
2104
2105 true
2106 }
2107
2108 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2110 let mut best_ipv4: Option<&CandidatePair> = None;
2111 let mut best_ipv6: Option<&CandidatePair> = None;
2112 for pair in &self.candidate_pairs {
2113 if pair.state != PairState::Succeeded {
2114 continue;
2115 }
2116
2117 match pair.remote_addr {
2118 SocketAddr::V4(_) => {
2119 if best_ipv4.is_none_or(|best| pair.priority > best.priority) {
2120 best_ipv4 = Some(pair);
2121 }
2122 }
2123 SocketAddr::V6(_) => {
2124 if best_ipv6.is_none_or(|best| pair.priority > best.priority) {
2125 best_ipv6 = Some(pair);
2126 }
2127 }
2128 }
2129 }
2130
2131 let mut result = Vec::new();
2132 if let Some(pair) = best_ipv4 {
2133 result.push(pair);
2134 }
2135 if let Some(pair) = best_ipv6 {
2136 result.push(pair);
2137 }
2138 result
2139 }
2140
2141 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2143 let mut candidates: Vec<_> = self
2144 .remote_candidates
2145 .iter()
2146 .filter(|(_, c)| c.state == CandidateState::New)
2147 .map(|(k, v)| (*k, v))
2148 .collect();
2149 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2151 candidates
2152 }
2153
2154 pub(super) fn start_validation(
2156 &mut self,
2157 sequence: VarInt,
2158 challenge: u64,
2159 now: Instant,
2160 ) -> Result<(), NatTraversalError> {
2161 let candidate = self
2162 .remote_candidates
2163 .get_mut(&sequence)
2164 .ok_or(NatTraversalError::UnknownCandidate)?;
2165 if candidate.state != CandidateState::New {
2166 return Err(NatTraversalError::InvalidCandidateState);
2167 }
2168
2169 if Self::is_validation_suspicious(candidate, now) {
2171 self.stats.security_rejections += 1;
2172 debug!(
2173 "Suspicious validation attempt rejected for address {}",
2174 candidate.address
2175 );
2176 return Err(NatTraversalError::SecurityValidationFailed);
2177 }
2178
2179 if self.active_validations.len() >= 10 {
2181 debug!(
2182 "Too many concurrent validations, rejecting new validation for {}",
2183 candidate.address
2184 );
2185 return Err(NatTraversalError::SecurityValidationFailed);
2186 }
2187
2188 candidate.state = CandidateState::Validating;
2190 candidate.attempt_count += 1;
2191 candidate.last_attempt = Some(now);
2192
2193 let validation = PathValidationState {
2195 challenge,
2196 sent_at: now,
2197 retry_count: 0,
2198 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2200 timeout_state: AdaptiveTimeoutState::new(),
2201 last_retry_at: None,
2202 };
2203
2204 self.active_validations
2205 .insert(candidate.address, validation);
2206 trace!(
2207 "Started validation for candidate {} with challenge {}",
2208 candidate.address, challenge
2209 );
2210 Ok(())
2211 }
2212
2213 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2215 if candidate.attempt_count > 10 {
2217 return true;
2218 }
2219 if let Some(last_attempt) = candidate.last_attempt {
2221 let time_since_last = now.duration_since(last_attempt);
2222 if time_since_last < Duration::from_millis(100) {
2223 return true; }
2225 }
2226
2227 if candidate.state == CandidateState::Failed {
2229 let time_since_discovery = now.duration_since(candidate.discovered_at);
2230 if time_since_discovery < Duration::from_secs(60) {
2231 return true; }
2233 }
2234
2235 false
2236 }
2237
2238 pub(super) fn handle_validation_success(
2240 &mut self,
2241 remote_addr: SocketAddr,
2242 challenge: u64,
2243 now: Instant,
2244 ) -> Result<VarInt, NatTraversalError> {
2245 let sequence = self
2247 .remote_candidates
2248 .iter()
2249 .find(|(_, c)| c.address == remote_addr)
2250 .map(|(seq, _)| *seq)
2251 .ok_or(NatTraversalError::UnknownCandidate)?;
2252 let validation = self
2254 .active_validations
2255 .get_mut(&remote_addr)
2256 .ok_or(NatTraversalError::NoActiveValidation)?;
2257
2258 if validation.challenge != challenge {
2259 return Err(NatTraversalError::ChallengeMismatch);
2260 }
2261
2262 let rtt = now.duration_since(validation.sent_at);
2264 validation.timeout_state.update_success(rtt);
2265
2266 self.network_monitor.record_success(rtt, now);
2268
2269 let candidate = self
2271 .remote_candidates
2272 .get_mut(&sequence)
2273 .ok_or(NatTraversalError::UnknownCandidate)?;
2274
2275 candidate.state = CandidateState::Valid;
2276 self.active_validations.remove(&remote_addr);
2277 self.stats.validations_succeeded += 1;
2278
2279 trace!(
2280 "Validation successful for {} with RTT {:?}",
2281 remote_addr, rtt
2282 );
2283 Ok(sequence)
2284 }
2285
2286 pub(super) fn start_coordination_round(
2288 &mut self,
2289 targets: Vec<PunchTarget>,
2290 now: Instant,
2291 ) -> Result<VarInt, NatTraversalError> {
2292 if self.security_state.is_coordination_rate_limited(now) {
2294 self.stats.rate_limit_violations += 1;
2295 debug!(
2296 "Rate limit exceeded for coordination request with {} targets",
2297 targets.len()
2298 );
2299 return Err(NatTraversalError::RateLimitExceeded);
2300 }
2301 if self.is_coordination_suspicious(&targets, now) {
2303 self.stats.suspicious_coordination_attempts += 1;
2304 self.stats.security_rejections += 1;
2305 debug!(
2306 "Suspicious coordination request rejected with {} targets",
2307 targets.len()
2308 );
2309 return Err(NatTraversalError::SuspiciousCoordination);
2310 }
2311
2312 for target in &targets {
2314 match self
2315 .security_state
2316 .validate_address(target.remote_addr, now)
2317 {
2318 AddressValidationResult::Invalid => {
2319 self.stats.invalid_address_rejections += 1;
2320 self.stats.security_rejections += 1;
2321 debug!(
2322 "Invalid target address in coordination: {}",
2323 target.remote_addr
2324 );
2325 return Err(NatTraversalError::InvalidAddress);
2326 }
2327 AddressValidationResult::Suspicious => {
2328 self.stats.security_rejections += 1;
2329 debug!(
2330 "Suspicious target address in coordination: {}",
2331 target.remote_addr
2332 );
2333 return Err(NatTraversalError::SecurityValidationFailed);
2334 }
2335 AddressValidationResult::Valid => {
2336 }
2338 }
2339 }
2340
2341 let round = self.next_sequence;
2342 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2343 .expect("sequence number overflow");
2344
2345 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2348
2349 self.coordination = Some(CoordinationState {
2350 round,
2351 punch_targets: targets,
2352 round_start: now,
2353 punch_start,
2354 round_duration: self.coordination_timeout,
2355 state: CoordinationPhase::Requesting,
2356 punch_request_sent: false,
2357 peer_punch_received: false,
2358 retry_count: 0,
2359 max_retries: 3,
2360 timeout_state: AdaptiveTimeoutState::new(),
2361 last_retry_at: None,
2362 });
2363
2364 self.stats.coordination_rounds += 1;
2365 trace!(
2366 "Started coordination round {} with {} targets",
2367 round,
2368 self.coordination
2369 .as_ref()
2370 .map(|c| c.punch_targets.len())
2371 .unwrap_or(0)
2372 );
2373 Ok(round)
2374 }
2375
2376 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2378 if targets.len() > 20 {
2380 return true;
2381 }
2382 let mut seen_addresses = std::collections::HashSet::new();
2384 for target in targets {
2385 if !seen_addresses.insert(target.remote_addr) {
2386 return true; }
2388 }
2389
2390 if targets.len() > 5 {
2392 let mut ipv4_addresses: Vec<_> = targets
2394 .iter()
2395 .filter_map(|t| match t.remote_addr.ip() {
2396 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2397 _ => None,
2398 })
2399 .collect();
2400
2401 if ipv4_addresses.len() >= 3 {
2402 ipv4_addresses.sort();
2403 let mut sequential_count = 1;
2404 for i in 1..ipv4_addresses.len() {
2405 if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2406 sequential_count += 1;
2407 if sequential_count >= 3 {
2408 return true; }
2410 } else {
2411 sequential_count = 1;
2412 }
2413 }
2414 }
2415 }
2416
2417 false
2418 }
2419
2420 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2422 self.coordination.as_ref().map(|c| c.state)
2423 }
2424 pub(super) fn should_send_punch_request(&self) -> bool {
2426 if let Some(coord) = &self.coordination {
2427 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2428 } else {
2429 false
2430 }
2431 }
2432 pub(super) fn mark_punch_request_sent(&mut self) {
2434 if let Some(coord) = &mut self.coordination {
2435 coord.punch_request_sent = true;
2436 coord.state = CoordinationPhase::Coordinating;
2437 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2438 }
2439 }
2440 pub(super) fn handle_peer_punch_request(
2442 &mut self,
2443 peer_round: VarInt,
2444 now: Instant,
2445 ) -> Result<bool, NatTraversalError> {
2446 if self.is_peer_coordination_suspicious(peer_round, now) {
2448 self.stats.suspicious_coordination_attempts += 1;
2449 self.stats.security_rejections += 1;
2450 debug!(
2451 "Suspicious peer coordination request rejected for round {}",
2452 peer_round
2453 );
2454 return Err(NatTraversalError::SuspiciousCoordination);
2455 }
2456 if let Some(coord) = &mut self.coordination {
2457 if coord.round == peer_round {
2458 match coord.state {
2459 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2460 coord.peer_punch_received = true;
2461 coord.state = CoordinationPhase::Preparing;
2462
2463 let network_rtt = self
2465 .network_monitor
2466 .get_estimated_rtt()
2467 .unwrap_or(Duration::from_millis(100));
2468 let quality_score = self.network_monitor.get_quality_score();
2469
2470 let base_grace = Duration::from_millis(150);
2472 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2473 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2474
2475 let adaptive_grace = Duration::from_millis(
2476 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2477 );
2478
2479 coord.punch_start = now + adaptive_grace;
2480
2481 trace!(
2482 "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2483 adaptive_grace, network_rtt, quality_score
2484 );
2485 Ok(true)
2486 }
2487 CoordinationPhase::Preparing => {
2488 trace!("Peer coordination confirmed during preparation");
2490 Ok(true)
2491 }
2492 _ => {
2493 debug!(
2494 "Received coordination in unexpected phase: {:?}",
2495 coord.state
2496 );
2497 Ok(false)
2498 }
2499 }
2500 } else {
2501 debug!(
2502 "Received coordination for wrong round: {} vs {}",
2503 peer_round, coord.round
2504 );
2505 Ok(false)
2506 }
2507 } else {
2508 debug!("Received peer coordination but no active round");
2509 Ok(false)
2510 }
2511 }
2512
2513 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2515 if peer_round.into_inner() == 0 {
2517 return true; }
2519 if let Some(coord) = &self.coordination {
2521 let our_round = coord.round.into_inner();
2522 let peer_round_num = peer_round.into_inner();
2523
2524 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2526 return true;
2527 }
2528 }
2529
2530 false
2531 }
2532
2533 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2535 if let Some(coord) = &self.coordination {
2536 match coord.state {
2537 CoordinationPhase::Preparing => now >= coord.punch_start,
2538 CoordinationPhase::Coordinating => {
2539 coord.peer_punch_received && now >= coord.punch_start
2541 }
2542 _ => false,
2543 }
2544 } else {
2545 false
2546 }
2547 }
2548 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2550 if let Some(coord) = &mut self.coordination {
2551 coord.state = CoordinationPhase::Punching;
2552 let network_rtt = self
2554 .network_monitor
2555 .get_estimated_rtt()
2556 .unwrap_or(Duration::from_millis(100));
2557
2558 let jitter_ms: u64 = rand::random::<u64>() % 11;
2560 let jitter = Duration::from_millis(jitter_ms);
2561 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2562
2563 coord.punch_start = transmission_time.max(now);
2565
2566 trace!(
2567 "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2568 coord.punch_start, network_rtt, jitter
2569 );
2570 }
2571 }
2572
2573 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2575 self.coordination
2576 .as_ref()
2577 .map(|c| c.punch_targets.as_slice())
2578 }
2579 pub(super) fn mark_coordination_validating(&mut self) {
2581 if let Some(coord) = &mut self.coordination {
2582 if coord.state == CoordinationPhase::Punching {
2583 coord.state = CoordinationPhase::Validating;
2584 trace!("Coordination moved to validation phase");
2585 }
2586 }
2587 }
2588 pub(super) fn handle_coordination_success(
2590 &mut self,
2591 remote_addr: SocketAddr,
2592 now: Instant,
2593 ) -> bool {
2594 if let Some(coord) = &mut self.coordination {
2595 let was_target = coord
2597 .punch_targets
2598 .iter()
2599 .any(|target| target.remote_addr == remote_addr);
2600 if was_target && coord.state == CoordinationPhase::Validating {
2601 let rtt = now.duration_since(coord.round_start);
2603 coord.timeout_state.update_success(rtt);
2604 self.network_monitor.record_success(rtt, now);
2605
2606 coord.state = CoordinationPhase::Succeeded;
2607 self.stats.direct_connections += 1;
2608 trace!(
2609 "Coordination succeeded via {} with RTT {:?}",
2610 remote_addr, rtt
2611 );
2612 true
2613 } else {
2614 false
2615 }
2616 } else {
2617 false
2618 }
2619 }
2620
2621 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2623 if let Some(coord) = &mut self.coordination {
2624 coord.retry_count += 1;
2625 coord.timeout_state.update_timeout();
2626 self.network_monitor.record_timeout(now);
2627 if coord.timeout_state.should_retry(coord.max_retries)
2629 && self.network_monitor.is_suitable_for_coordination()
2630 {
2631 coord.state = CoordinationPhase::Requesting;
2633 coord.punch_request_sent = false;
2634 coord.peer_punch_received = false;
2635 coord.round_start = now;
2636 coord.last_retry_at = Some(now);
2637
2638 let retry_delay = coord.timeout_state.get_retry_delay();
2640
2641 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2643 let adjusted_delay = Duration::from_millis(
2644 (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2645 );
2646
2647 coord.punch_start = now + adjusted_delay;
2648
2649 trace!(
2650 "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2651 coord.round,
2652 coord.retry_count + 1,
2653 adjusted_delay,
2654 self.network_monitor.get_quality_score()
2655 );
2656 true
2657 } else {
2658 coord.state = CoordinationPhase::Failed;
2659 self.stats.coordination_failures += 1;
2660
2661 if !self.network_monitor.is_suitable_for_coordination() {
2662 trace!(
2663 "Coordination failed due to poor network conditions (quality: {:.2})",
2664 self.network_monitor.get_quality_score()
2665 );
2666 } else {
2667 trace!("Coordination failed after {} attempts", coord.retry_count);
2668 }
2669 false
2670 }
2671 } else {
2672 false
2673 }
2674 }
2675
2676 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2678 if let Some(coord) = &mut self.coordination {
2679 let timeout = coord.timeout_state.get_timeout();
2680 let elapsed = now.duration_since(coord.round_start);
2681 if elapsed > timeout {
2682 trace!(
2683 "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2684 coord.round, elapsed, timeout
2685 );
2686 self.handle_coordination_failure(now);
2687 true
2688 } else {
2689 false
2690 }
2691 } else {
2692 false
2693 }
2694 }
2695
2696 pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2698 let mut expired_validations = Vec::new();
2699 let mut retry_validations = Vec::new();
2700
2701 for (addr, validation) in &mut self.active_validations {
2702 let timeout = validation.timeout_state.get_timeout();
2703 let elapsed = now.duration_since(validation.sent_at);
2704
2705 if elapsed >= timeout {
2706 if validation
2707 .timeout_state
2708 .should_retry(validation.max_retries)
2709 {
2710 retry_validations.push(*addr);
2712 } else {
2713 expired_validations.push(*addr);
2715 }
2716 }
2717 }
2718
2719 for addr in retry_validations {
2721 if let Some(validation) = self.active_validations.get_mut(&addr) {
2722 validation.retry_count += 1;
2723 validation.sent_at = now;
2724 validation.last_retry_at = Some(now);
2725 validation.timeout_state.update_timeout();
2726
2727 trace!(
2728 "Retrying validation for {} (attempt {})",
2729 addr,
2730 validation.retry_count + 1
2731 );
2732 }
2733 }
2734
2735 for addr in &expired_validations {
2737 self.active_validations.remove(addr);
2738 self.network_monitor.record_timeout(now);
2739 trace!("Validation expired for {}", addr);
2740 }
2741
2742 expired_validations
2743 }
2744
2745 pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2747 let mut retry_addresses = Vec::new();
2748
2749 for (addr, validation) in &mut self.active_validations {
2751 let elapsed = now.duration_since(validation.sent_at);
2752 let timeout = validation.timeout_state.get_timeout();
2753
2754 if elapsed > timeout
2755 && validation
2756 .timeout_state
2757 .should_retry(validation.max_retries)
2758 {
2759 validation.retry_count += 1;
2761 validation.last_retry_at = Some(now);
2762 validation.sent_at = now; validation.timeout_state.update_timeout();
2764
2765 retry_addresses.push(*addr);
2766 trace!(
2767 "Scheduled retry {} for validation to {}",
2768 validation.retry_count, addr
2769 );
2770 }
2771 }
2772
2773 retry_addresses
2774 }
2775
2776 pub(super) fn update_network_conditions(&mut self, now: Instant) {
2778 self.network_monitor.cleanup(now);
2779
2780 let multiplier = self.network_monitor.get_timeout_multiplier();
2782
2783 for validation in self.active_validations.values_mut() {
2785 if multiplier > 1.5 {
2786 validation.timeout_state.backoff_multiplier =
2788 (validation.timeout_state.backoff_multiplier * 1.2)
2789 .min(validation.timeout_state.max_backoff_multiplier);
2790 } else if multiplier < 0.8 {
2791 validation.timeout_state.backoff_multiplier =
2793 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2794 }
2795 }
2796 }
2797
2798 pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2800 if let Some(coord) = &self.coordination {
2801 if coord.retry_count > 0 {
2802 if let Some(last_retry) = coord.last_retry_at {
2803 let retry_delay = coord.timeout_state.get_retry_delay();
2804 return now.duration_since(last_retry) >= retry_delay;
2805 }
2806 }
2807 }
2808 false
2809 }
2810
2811 pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2813 self.resource_manager.update_stats(
2815 self.active_validations.len(),
2816 self.local_candidates.len(),
2817 self.remote_candidates.len(),
2818 self.candidate_pairs.len(),
2819 );
2820
2821 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2823 self.active_validations.len(),
2824 self.local_candidates.len(),
2825 self.remote_candidates.len(),
2826 self.candidate_pairs.len(),
2827 );
2828
2829 let mut cleaned = 0;
2831
2832 if self.resource_manager.should_cleanup(now) {
2833 cleaned += self.resource_manager.cleanup_expired_resources(
2834 &mut self.active_validations,
2835 &mut self.local_candidates,
2836 &mut self.remote_candidates,
2837 &mut self.candidate_pairs,
2838 &mut self.coordination,
2839 now,
2840 );
2841
2842 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2844 cleaned += self.resource_manager.aggressive_cleanup(
2845 &mut self.active_validations,
2846 &mut self.local_candidates,
2847 &mut self.remote_candidates,
2848 &mut self.candidate_pairs,
2849 now,
2850 );
2851 }
2852 }
2853
2854 cleaned
2855 }
2856
2857 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2859 self.resource_manager.update_stats(
2861 self.active_validations.len(),
2862 self.local_candidates.len(),
2863 self.remote_candidates.len(),
2864 self.candidate_pairs.len(),
2865 );
2866 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2867 self.active_validations.len(),
2868 self.local_candidates.len(),
2869 self.remote_candidates.len(),
2870 self.candidate_pairs.len(),
2871 );
2872 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2874 self.resource_manager.stats.allocation_failures += 1;
2875 return true;
2876 }
2877
2878 if self.resource_manager.check_resource_limits(self) {
2880 self.resource_manager.stats.allocation_failures += 1;
2881 return true;
2882 }
2883
2884 false
2885 }
2886
2887 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2889 let mut next_timeout = None;
2890 if let Some(coord) = &self.coordination {
2892 match coord.state {
2893 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2894 let timeout_at = coord.round_start + self.coordination_timeout;
2895 next_timeout =
2896 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2897 }
2898 CoordinationPhase::Preparing => {
2899 next_timeout = Some(
2901 next_timeout
2902 .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
2903 );
2904 }
2905 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2906 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2908 next_timeout =
2909 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2910 }
2911 _ => {}
2912 }
2913 }
2914
2915 for validation in self.active_validations.values() {
2917 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2918 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2919 }
2920
2921 if self.resource_manager.should_cleanup(now) {
2923 let cleanup_at = now + Duration::from_secs(1);
2925 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2926 }
2927
2928 next_timeout
2929 }
2930
2931 pub(super) fn handle_timeout(
2933 &mut self,
2934 now: Instant,
2935 ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2936 let mut actions = Vec::new();
2937 if let Some(coord) = &mut self.coordination {
2939 match coord.state {
2940 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2941 let timeout_at = coord.round_start + self.coordination_timeout;
2942 if now >= timeout_at {
2943 coord.retry_count += 1;
2944 if coord.retry_count >= coord.max_retries {
2945 debug!("Coordination failed after {} retries", coord.retry_count);
2946 coord.state = CoordinationPhase::Failed;
2947 actions.push(TimeoutAction::Failed);
2948 } else {
2949 debug!(
2950 "Coordination timeout, retrying ({}/{})",
2951 coord.retry_count, coord.max_retries
2952 );
2953 coord.state = CoordinationPhase::Requesting;
2954 coord.round_start = now;
2955 actions.push(TimeoutAction::RetryCoordination);
2956 }
2957 }
2958 }
2959 CoordinationPhase::Preparing => {
2960 if now >= coord.punch_start {
2962 debug!("Starting coordinated hole punching");
2963 coord.state = CoordinationPhase::Punching;
2964 actions.push(TimeoutAction::StartValidation);
2965 }
2966 }
2967 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2968 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2969 if now >= timeout_at {
2970 coord.retry_count += 1;
2971 if coord.retry_count >= coord.max_retries {
2972 debug!("Validation failed after {} retries", coord.retry_count);
2973 coord.state = CoordinationPhase::Failed;
2974 actions.push(TimeoutAction::Failed);
2975 } else {
2976 debug!(
2977 "Validation timeout, retrying ({}/{})",
2978 coord.retry_count, coord.max_retries
2979 );
2980 coord.state = CoordinationPhase::Punching;
2981 actions.push(TimeoutAction::StartValidation);
2982 }
2983 }
2984 }
2985 CoordinationPhase::Succeeded => {
2986 actions.push(TimeoutAction::Complete);
2987 }
2988 CoordinationPhase::Failed => {
2989 actions.push(TimeoutAction::Failed);
2990 }
2991 _ => {}
2992 }
2993 }
2994
2995 let mut expired_validations = Vec::new();
2997 for (addr, validation) in &mut self.active_validations {
2998 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2999 if now >= timeout_at {
3000 validation.retry_count += 1;
3001 if validation.retry_count >= validation.max_retries {
3002 debug!("Path validation failed for {}: max retries exceeded", addr);
3003 expired_validations.push(*addr);
3004 } else {
3005 debug!(
3006 "Path validation timeout for {}, retrying ({}/{})",
3007 addr, validation.retry_count, validation.max_retries
3008 );
3009 validation.sent_at = now;
3010 validation.last_retry_at = Some(now);
3011 actions.push(TimeoutAction::StartValidation);
3012 }
3013 }
3014 }
3015
3016 for addr in expired_validations {
3018 self.active_validations.remove(&addr);
3019 }
3020
3021 if self.resource_manager.should_cleanup(now) {
3023 self.resource_manager.perform_cleanup(now);
3024 }
3025
3026 self.network_monitor.update_quality_score(now);
3028
3029 if self.coordination.is_none()
3031 && !self.local_candidates.is_empty()
3032 && !self.remote_candidates.is_empty()
3033 {
3034 actions.push(TimeoutAction::RetryDiscovery);
3035 }
3036
3037 Ok(actions)
3038 }
3039
3040 pub(super) fn handle_address_observation(
3046 &mut self,
3047 peer_id: [u8; 32],
3048 observed_address: SocketAddr,
3049 connection_id: crate::shared::ConnectionId,
3050 now: Instant,
3051 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3052 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3053 let connection_context = ConnectionContext {
3054 connection_id,
3055 original_destination: observed_address, };
3058
3059 bootstrap_coordinator.observe_peer_address(
3061 peer_id,
3062 observed_address,
3063 connection_context,
3064 now,
3065 )?;
3066
3067 let sequence = self.next_sequence;
3069 self.next_sequence =
3070 VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3071
3072 let priority = VarInt::from_u32(100); let add_address_frame =
3074 bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3075
3076 Ok(add_address_frame)
3077 } else {
3078 Ok(None)
3080 }
3081 }
3082
3083 pub(super) fn handle_punch_me_now_frame(
3088 &mut self,
3089 from_peer: [u8; 32],
3090 source_addr: SocketAddr,
3091 frame: &crate::frame::PunchMeNow,
3092 now: Instant,
3093 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3094 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3095 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3096 } else {
3097 Ok(None)
3099 }
3100 }
3101 pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3105 self.bootstrap_coordinator
3106 .as_ref()
3107 .and_then(|coord| coord.peer_index.get(&peer_id).map(|p| p.observed_addr))
3108 }
3109
3110 pub(super) fn record_successful_callback_probe(
3115 &mut self,
3116 request_id: VarInt,
3117 source_address: SocketAddr,
3118 ) {
3119 debug!(
3120 "Recording successful callback probe: request_id={}, source={}",
3121 request_id, source_address
3122 );
3123 self.stats.callback_probes_received += 1;
3125 self.stats.callback_probes_successful += 1;
3126
3127 }
3130
3131 pub(super) fn record_failed_callback_probe(
3135 &mut self,
3136 request_id: VarInt,
3137 error_code: Option<crate::frame::TryConnectError>,
3138 ) {
3139 debug!(
3140 "Recording failed callback probe: request_id={}, error={:?}",
3141 request_id, error_code
3142 );
3143 self.stats.callback_probes_received += 1;
3145 self.stats.callback_probes_failed += 1;
3146 }
3147
3148 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3150 debug!("Starting candidate discovery for NAT traversal");
3151 if self.local_candidates.is_empty() {
3153 debug!("Local candidates will be populated by discovery manager");
3156 }
3157
3158 Ok(())
3159 }
3160
3161 pub(super) fn queue_add_address_frame(
3163 &mut self,
3164 sequence: VarInt,
3165 address: SocketAddr,
3166 priority: u32,
3167 ) -> Result<(), NatTraversalError> {
3168 debug!(
3169 "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3170 sequence, address, priority
3171 );
3172
3173 let candidate = AddressCandidate {
3175 address,
3176 priority,
3177 source: CandidateSource::Local,
3178 discovered_at: Instant::now(),
3179 state: CandidateState::New,
3180 attempt_count: 0,
3181 last_attempt: None,
3182 };
3183
3184 if !self.local_candidates.values().any(|c| c.address == address) {
3186 self.local_candidates.insert(sequence, candidate);
3187 }
3188
3189 Ok(())
3190 }
3191}
3192
3193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3195#[allow(dead_code)]
3196pub(crate) enum NatTraversalError {
3197 TooManyCandidates,
3199 DuplicateAddress,
3201 UnknownCandidate,
3203 InvalidCandidateState,
3205 NoActiveValidation,
3207 ChallengeMismatch,
3209 NoActiveCoordination,
3211 SecurityValidationFailed,
3213 RateLimitExceeded,
3215 InvalidAddress,
3217 SuspiciousCoordination,
3219 ResourceLimitExceeded,
3221}
3222impl std::fmt::Display for NatTraversalError {
3223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3224 match self {
3225 Self::TooManyCandidates => write!(f, "too many candidates"),
3226 Self::DuplicateAddress => write!(f, "duplicate address"),
3227 Self::UnknownCandidate => write!(f, "unknown candidate"),
3228 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3229 Self::NoActiveValidation => write!(f, "no active validation"),
3230 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3231 Self::NoActiveCoordination => write!(f, "no active coordination"),
3232 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3233 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3234 Self::InvalidAddress => write!(f, "invalid address"),
3235 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3236 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3237 }
3238 }
3239}
3240
3241impl std::error::Error for NatTraversalError {}
3242
3243#[derive(Debug, Clone)]
3245#[allow(dead_code)]
3246pub(crate) struct SecurityStats {
3247 pub total_security_rejections: u32,
3249 pub rate_limit_violations: u32,
3251 pub invalid_address_rejections: u32,
3253 pub suspicious_coordination_attempts: u32,
3255 pub active_validations: usize,
3257 pub cached_address_validations: usize,
3259 pub current_candidate_rate: usize,
3261 pub current_coordination_rate: usize,
3263}
3264#[derive(Debug)]
3269pub(crate) struct BootstrapCoordinator {
3270 address_observations: HashMap<SocketAddr, AddressObservation>,
3272 peer_index: HashMap<PeerId, ObservedPeer>,
3274 coordination_table: HashMap<VarInt, CoordinationEntry>,
3276 security_validator: SecurityValidationState,
3278 stats: BootstrapStats,
3280}
3281type PeerId = [u8; 32];
3284#[derive(Debug, Clone)]
3286struct ObservedPeer {
3287 observed_addr: SocketAddr,
3288}
3289
3290#[derive(Debug, Clone)]
3292struct CoordinationEntry {
3293 peer_b: Option<PeerId>,
3294 address_hint: SocketAddr,
3295}
3296#[derive(Debug, Clone)]
3298#[allow(dead_code)]
3299pub(crate) struct PeerObservationRecord {
3300 peer_id: PeerId,
3302 observed_address: SocketAddr,
3304 observed_at: Instant,
3306 connection_context: ConnectionContext,
3308 can_coordinate: bool,
3310 coordination_count: u32,
3312 success_rate: f64,
3314}
3315
3316#[derive(Debug, Clone)]
3320#[allow(dead_code)]
3321pub(crate) struct ConnectionContext {
3322 connection_id: ConnectionId,
3324 original_destination: SocketAddr,
3326 }
3328
3329#[derive(Debug, Clone)]
3333#[allow(dead_code)]
3334struct AddressObservation {
3335 address: SocketAddr,
3337 first_observed: Instant,
3339 observation_count: u32,
3341 validation_state: AddressValidationResult,
3343 associated_peers: Vec<PeerId>,
3345}
3346
3347#[derive(Debug, Clone, Default)]
3351pub(crate) struct BootstrapConfig {
3352 _unused: (),
3353}
3354#[derive(Debug, Clone, Default)]
3356pub(crate) struct BootstrapStats {
3357 total_observations: u64,
3359 total_coordinations: u64,
3361 successful_coordinations: u64,
3363 security_rejections: u64,
3365}
3366impl BootstrapCoordinator {
3368 pub(crate) fn new(_config: BootstrapConfig) -> Self {
3370 Self {
3371 address_observations: HashMap::new(),
3372 peer_index: HashMap::new(),
3373 coordination_table: HashMap::new(),
3374 security_validator: SecurityValidationState::new(),
3375 stats: BootstrapStats::default(),
3376 }
3377 }
3378 pub(crate) fn observe_peer_address(
3383 &mut self,
3384 peer_id: PeerId,
3385 observed_address: SocketAddr,
3386 _connection_context: ConnectionContext,
3387 now: Instant,
3388 ) -> Result<(), NatTraversalError> {
3389 match self
3391 .security_validator
3392 .validate_address(observed_address, now)
3393 {
3394 AddressValidationResult::Valid => {}
3395 AddressValidationResult::Invalid => {
3396 self.stats.security_rejections += 1;
3397 return Err(NatTraversalError::InvalidAddress);
3398 }
3399 AddressValidationResult::Suspicious => {
3400 self.stats.security_rejections += 1;
3401 return Err(NatTraversalError::SecurityValidationFailed);
3402 }
3403 }
3404
3405 if self.security_validator.is_candidate_rate_limited(now) {
3407 self.stats.security_rejections += 1;
3408 return Err(NatTraversalError::RateLimitExceeded);
3409 }
3410
3411 let observation = self
3413 .address_observations
3414 .entry(observed_address)
3415 .or_insert_with(|| AddressObservation {
3416 address: observed_address,
3417 first_observed: now,
3418 observation_count: 0,
3419 validation_state: AddressValidationResult::Valid,
3420 associated_peers: Vec::new(),
3421 });
3422
3423 observation.observation_count += 1;
3424 if !observation.associated_peers.contains(&peer_id) {
3425 observation.associated_peers.push(peer_id);
3426 }
3427
3428 self.peer_index.insert(
3430 peer_id,
3431 ObservedPeer {
3432 observed_addr: observed_address,
3433 },
3434 );
3435
3436 self.stats.total_observations += 1;
3438 debug!(
3441 "Observed peer {:?} at address {} (total observations: {})",
3442 peer_id, observed_address, self.stats.total_observations
3443 );
3444
3445 Ok(())
3446 }
3447
3448 pub(crate) fn generate_add_address_frame(
3453 &self,
3454 peer_id: PeerId,
3455 sequence: VarInt,
3456 priority: VarInt,
3457 ) -> Option<crate::frame::AddAddress> {
3458 let addr = self.peer_index.get(&peer_id)?.observed_addr;
3459 Some(crate::frame::AddAddress {
3460 sequence,
3461 address: addr,
3462 priority,
3463 })
3464 }
3465
3466 pub(crate) fn process_punch_me_now_frame(
3471 &mut self,
3472 from_peer: PeerId,
3473 source_addr: SocketAddr,
3474 frame: &crate::frame::PunchMeNow,
3475 now: Instant,
3476 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3477 if self
3479 .security_validator
3480 .is_adaptive_rate_limited(from_peer, now)
3481 {
3482 self.stats.security_rejections += 1;
3483 debug!(
3484 "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3485 hex::encode(&from_peer[..8])
3486 );
3487 return Err(NatTraversalError::RateLimitExceeded);
3488 }
3489 self.security_validator
3491 .enhanced_address_validation(frame.address, source_addr, now)
3492 .inspect_err(|&e| {
3493 self.stats.security_rejections += 1;
3494 debug!(
3495 "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3496 hex::encode(&from_peer[..8]),
3497 e
3498 );
3499 })?;
3500
3501 self.security_validator
3503 .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3504 .inspect_err(|&e| {
3505 self.stats.security_rejections += 1;
3506 debug!(
3507 "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3508 hex::encode(&from_peer[..8]),
3509 e
3510 );
3511 })?;
3512
3513 let _entry = self
3515 .coordination_table
3516 .entry(frame.round)
3517 .or_insert(CoordinationEntry {
3518 peer_b: frame.target_peer_id,
3519 address_hint: frame.address,
3520 });
3521 if let Some(peer_b) = frame.target_peer_id {
3523 if _entry.peer_b.is_none() {
3524 _entry.peer_b = Some(peer_b);
3525 }
3526 _entry.address_hint = frame.address;
3527 }
3528
3529 if let Some(_target_peer_id) = frame.target_peer_id {
3531 let coordination_frame = crate::frame::PunchMeNow {
3532 round: frame.round,
3533 paired_with_sequence_number: frame.paired_with_sequence_number,
3534 address: frame.address,
3535 target_peer_id: Some(from_peer),
3536 };
3537 self.stats.total_coordinations += 1;
3538 Ok(Some(coordination_frame))
3539 } else {
3540 self.stats.successful_coordinations += 1;
3542 Ok(None)
3543 }
3544 }
3545
3546 #[allow(dead_code)]
3552 pub(crate) fn cleanup_expired_sessions(&mut self, _now: Instant) {}
3553
3554 #[allow(dead_code)]
3559 pub(crate) fn poll_session_state_machine(&mut self, _now: Instant) -> Vec<()> {
3560 Vec::new()
3562 }
3563
3564 #[allow(dead_code)]
3568 fn cleanup_completed_sessions(&mut self, _now: Instant) {}
3569
3570 #[allow(dead_code)]
3575 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
3576 let _ = peer_id;
3579 None
3580 }
3581 #[allow(dead_code)]
3602 pub(crate) fn get_peer_record(&self, _peer_id: PeerId) -> Option<&PeerObservationRecord> {
3603 None
3605 }
3606}
3607
3608#[cfg(test)]
3622mod tests {
3623 use super::*;
3624
3625 fn create_test_state() -> NatTraversalState {
3627 NatTraversalState::new(
3628 10, Duration::from_secs(30), )
3631 }
3632
3633 #[test]
3634 fn test_add_quic_discovered_address() {
3635 let mut state = create_test_state();
3637 let now = Instant::now();
3638
3639 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
3641 let seq = state.add_local_candidate(
3642 discovered_addr,
3643 CandidateSource::Observed { by_node: None },
3644 now,
3645 );
3646
3647 assert_eq!(state.local_candidates.len(), 1);
3649 let candidate = state.local_candidates.get(&seq).unwrap();
3650 assert_eq!(candidate.address, discovered_addr);
3651 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3652 assert_eq!(candidate.state, CandidateState::New);
3653
3654 assert!(candidate.priority > 0);
3656 }
3657
3658 #[test]
3659 fn test_add_multiple_quic_discovered_addresses() {
3660 let mut state = create_test_state();
3662 let now = Instant::now();
3663
3664 let addrs = vec![
3665 SocketAddr::from(([1, 2, 3, 4], 5678)),
3666 SocketAddr::from(([5, 6, 7, 8], 9012)),
3667 SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
3668 ];
3669
3670 let mut sequences = Vec::new();
3671 for addr in &addrs {
3672 let seq =
3673 state.add_local_candidate(*addr, CandidateSource::Observed { by_node: None }, now);
3674 sequences.push(seq);
3675 }
3676
3677 assert_eq!(state.local_candidates.len(), 3);
3679
3680 for (seq, addr) in sequences.iter().zip(&addrs) {
3682 let candidate = state.local_candidates.get(seq).unwrap();
3683 assert_eq!(candidate.address, *addr);
3684 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3685 }
3686 }
3687
3688 #[test]
3689 fn test_quic_discovered_addresses_in_local_candidates() {
3690 let mut state = create_test_state();
3692 let now = Instant::now();
3693
3694 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3696 let seq = state.add_local_candidate(addr, CandidateSource::Observed { by_node: None }, now);
3697
3698 assert!(state.local_candidates.contains_key(&seq));
3700 let candidate = state.local_candidates.get(&seq).unwrap();
3701 assert_eq!(candidate.address, addr);
3702
3703 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3705 }
3706
3707 #[test]
3708 fn test_quic_discovered_addresses_included_in_hole_punching() {
3709 let mut state = create_test_state();
3711 let now = Instant::now();
3712
3713 let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3715 state.add_local_candidate(local_addr, CandidateSource::Observed { by_node: None }, now);
3716
3717 let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
3719 let priority = VarInt::from_u32(100);
3720 state
3721 .add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
3722 .expect("add remote candidate should succeed");
3723
3724 state.generate_candidate_pairs(now);
3726
3727 assert_eq!(state.candidate_pairs.len(), 1);
3729 let pair = &state.candidate_pairs[0];
3730 assert_eq!(pair.local_addr, local_addr);
3731 assert_eq!(pair.remote_addr, remote_addr);
3732 }
3733
3734 #[test]
3735 fn test_prioritize_quic_discovered_over_predicted() {
3736 let mut state = create_test_state();
3738 let now = Instant::now();
3739
3740 let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
3742 let predicted_seq =
3743 state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
3744
3745 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
3747 let discovered_seq = state.add_local_candidate(
3748 discovered_addr,
3749 CandidateSource::Observed { by_node: None },
3750 now,
3751 );
3752
3753 let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
3755 let discovered_priority = state
3756 .local_candidates
3757 .get(&discovered_seq)
3758 .unwrap()
3759 .priority;
3760
3761 assert!(discovered_priority >= predicted_priority);
3764 }
3765
3766 #[test]
3767 fn test_integration_with_nat_traversal_flow() {
3768 let mut state = create_test_state();
3770 let now = Instant::now();
3771
3772 let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
3774 state.add_local_candidate(local_addr, CandidateSource::Local, now);
3775
3776 let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
3777 state.add_local_candidate(
3778 discovered_addr,
3779 CandidateSource::Observed { by_node: None },
3780 now,
3781 );
3782
3783 let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
3785 let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
3786 let priority = VarInt::from_u32(100);
3787 state
3788 .add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
3789 .expect("add remote candidate should succeed");
3790 state
3791 .add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
3792 .expect("add remote candidate should succeed");
3793
3794 state.generate_candidate_pairs(now);
3796
3797 assert_eq!(state.candidate_pairs.len(), 4);
3799
3800 let discovered_pairs: Vec<_> = state
3802 .candidate_pairs
3803 .iter()
3804 .filter(|p| p.local_addr == discovered_addr)
3805 .collect();
3806 assert_eq!(discovered_pairs.len(), 2);
3807 }
3808}