1use std::{
2 collections::{HashMap, VecDeque},
3 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
4 time::Duration,
5};
6
7use crate::shared::ConnectionId;
8use tracing::{debug, info, trace, warn};
9
10use crate::{Instant, VarInt};
11
12#[derive(Debug)]
17pub(super) struct NatTraversalState {
18 pub(super) role: NatTraversalRole,
20 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
22 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
24 pub(super) candidate_pairs: Vec<CandidatePair>,
26 pub(super) pair_index: HashMap<SocketAddr, usize>,
28 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
30 pub(super) coordination: Option<CoordinationState>,
32 pub(super) next_sequence: VarInt,
34 pub(super) max_candidates: u32,
36 pub(super) coordination_timeout: Duration,
38 pub(super) stats: NatTraversalStats,
40 pub(super) security_state: SecurityValidationState,
42 pub(super) network_monitor: NetworkConditionMonitor,
44 pub(super) resource_manager: ResourceCleanupCoordinator,
46 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
48 #[allow(dead_code)] pub(super) multi_dest_transmitter: MultiDestinationTransmitter,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum NatTraversalRole {
56 Client,
58 Server { can_relay: bool },
60 Bootstrap,
62}
63
64#[derive(Debug, Clone)]
66pub(super) struct AddressCandidate {
67 pub(super) address: SocketAddr,
69 pub(super) priority: u32,
71 pub(super) source: CandidateSource,
73 pub(super) discovered_at: Instant,
75 pub(super) state: CandidateState,
77 pub(super) attempt_count: u32,
79 pub(super) last_attempt: Option<Instant>,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum CandidateSource {
86 Local,
88 Observed { by_node: Option<VarInt> },
90 Peer,
92 Predicted,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum CandidateState {
99 New,
101 Validating,
103 Valid,
105 Failed,
107 Removed,
109}
110
111#[derive(Debug)]
113pub(super) struct PathValidationState {
114 pub(super) challenge: u64,
116 pub(super) sent_at: Instant,
118 pub(super) retry_count: u32,
120 pub(super) max_retries: u32,
122 #[allow(dead_code)] pub(super) coordination_round: Option<VarInt>,
125 pub(super) timeout_state: AdaptiveTimeoutState,
127 pub(super) last_retry_at: Option<Instant>,
129}
130
131#[derive(Debug)]
133pub(super) struct CoordinationState {
134 pub(super) round: VarInt,
136 pub(super) punch_targets: Vec<PunchTarget>,
138 pub(super) round_start: Instant,
140 pub(super) punch_start: Instant,
142 #[allow(dead_code)] pub(super) round_duration: Duration,
145 pub(super) state: CoordinationPhase,
147 pub(super) punch_request_sent: bool,
149 pub(super) peer_punch_received: bool,
151 pub(super) retry_count: u32,
153 pub(super) max_retries: u32,
155 pub(super) timeout_state: AdaptiveTimeoutState,
157 pub(super) last_retry_at: Option<Instant>,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163#[allow(dead_code)] pub(crate) enum CoordinationPhase {
165 Idle,
167 Requesting,
169 Coordinating,
171 Preparing,
173 Punching,
175 Validating,
177 Succeeded,
179 Failed,
181}
182
183#[derive(Debug, Clone)]
185pub(super) struct PunchTarget {
186 pub(super) remote_addr: SocketAddr,
188 #[allow(dead_code)]
190 pub(super) remote_sequence: VarInt,
191 pub(super) challenge: u64,
193}
194
195#[derive(Debug, Clone, PartialEq, Eq)]
197pub(super) enum TimeoutAction {
198 RetryDiscovery,
200 RetryCoordination,
202 StartValidation,
204 Complete,
206 Failed,
208}
209
210#[derive(Debug, Clone)]
212#[allow(dead_code)] pub(super) struct MultiDestPunchTarget {
214 pub destination: SocketAddr,
216 pub local_addr: SocketAddr,
218 pub pair_type: PairType,
220 pub priority: u32,
222 pub created_at: Instant,
224}
225
226#[derive(Debug, Clone)]
228pub(super) struct CandidatePair {
229 pub(super) remote_sequence: VarInt,
231 pub(super) local_addr: SocketAddr,
233 pub(super) remote_addr: SocketAddr,
235 pub(super) priority: u64,
237 pub(super) state: PairState,
239 pub(super) pair_type: PairType,
241 pub(super) created_at: Instant,
243 #[allow(dead_code)] pub(super) last_check: Option<Instant>,
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub(super) enum PairState {
251 Waiting,
253 Succeeded,
255 #[allow(dead_code)] Failed,
258 Frozen,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
264pub(super) enum PairType {
265 HostToHost,
267 HostToServerReflexive,
269 ServerReflexiveToHost,
271 ServerReflexiveToServerReflexive,
273 PeerReflexive,
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq)]
279pub(super) enum CandidateType {
280 Host,
282 ServerReflexive,
284 PeerReflexive,
286}
287
288fn calculate_candidate_priority(
291 candidate_type: CandidateType,
292 local_preference: u16,
293 component_id: u8,
294) -> u32 {
295 let type_preference = match candidate_type {
296 CandidateType::Host => 126,
297 CandidateType::PeerReflexive => 110,
298 CandidateType::ServerReflexive => 100,
299 };
300
301 (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
303}
304
305fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
308 let g = local_priority as u64;
309 let d = remote_priority as u64;
310
311 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
313}
314
315fn classify_candidate_type(source: CandidateSource) -> CandidateType {
317 match source {
318 CandidateSource::Local => CandidateType::Host,
319 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
320 CandidateSource::Peer => CandidateType::PeerReflexive,
321 CandidateSource::Predicted => CandidateType::ServerReflexive, }
323}
324
325fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
327 match (local_type, remote_type) {
328 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
329 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
330 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
331 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
332 PairType::ServerReflexiveToServerReflexive
333 }
334 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
335 PairType::PeerReflexive
336 }
337 }
338}
339
340fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
342 match (local.address, remote.address) {
344 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
345 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
346 _ => false, }
348}
349
350#[derive(Debug, Default, Clone)]
352pub(crate) struct NatTraversalStats {
353 pub(super) remote_candidates_received: u32,
355 pub(super) local_candidates_sent: u32,
357 pub(super) validations_succeeded: u32,
359 #[allow(dead_code)] pub(super) validations_failed: u32,
362 pub(super) coordination_rounds: u32,
364 #[allow(dead_code)] pub(super) successful_coordinations: u32,
367 #[allow(dead_code)] pub(super) failed_coordinations: u32,
370 #[allow(dead_code)] pub(super) timed_out_coordinations: u32,
373 pub(super) coordination_failures: u32,
375 pub(super) direct_connections: u32,
377 pub(super) security_rejections: u32,
379 pub(super) rate_limit_violations: u32,
381 pub(super) invalid_address_rejections: u32,
383 pub(super) suspicious_coordination_attempts: u32,
385}
386
387#[derive(Debug)]
389pub(super) struct SecurityValidationState {
390 candidate_rate_tracker: VecDeque<Instant>,
392 max_candidates_per_window: u32,
394 rate_window: Duration,
396 coordination_requests: VecDeque<CoordinationRequest>,
398 max_coordination_per_window: u32,
400 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
402 #[allow(dead_code)] validation_cache_timeout: Duration,
405}
406
407#[derive(Debug, Clone)]
409struct CoordinationRequest {
410 timestamp: Instant,
412}
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
416enum AddressValidationResult {
417 Valid,
419 Invalid,
421 Suspicious,
423}
424
425#[derive(Debug, Clone)]
427pub(super) struct AdaptiveTimeoutState {
428 current_timeout: Duration,
430 min_timeout: Duration,
432 max_timeout: Duration,
434 base_timeout: Duration,
436 backoff_multiplier: f64,
438 max_backoff_multiplier: f64,
440 jitter_factor: f64,
442 srtt: Option<Duration>,
444 rttvar: Option<Duration>,
446 last_rtt: Option<Duration>,
448 consecutive_timeouts: u32,
450 successful_responses: u32,
452}
453
454#[derive(Debug)]
456pub(super) struct NetworkConditionMonitor {
457 rtt_samples: VecDeque<Duration>,
459 max_samples: usize,
461 packet_loss_rate: f64,
463 #[allow(dead_code)] congestion_window: u32,
466 quality_score: f64,
468 last_quality_update: Instant,
470 quality_update_interval: Duration,
472 timeout_stats: TimeoutStatistics,
474}
475
476#[derive(Debug, Default)]
478struct TimeoutStatistics {
479 total_timeouts: u64,
481 total_responses: u64,
483 avg_response_time: Duration,
485 timeout_rate: f64,
487 last_update: Option<Instant>,
489}
490
491impl SecurityValidationState {
492 fn new() -> Self {
494 Self {
495 candidate_rate_tracker: VecDeque::new(),
496 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
498 coordination_requests: VecDeque::new(),
499 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
501 validation_cache_timeout: Duration::from_secs(300), }
503 }
504
505 #[allow(dead_code)] fn new_with_limits(
508 max_candidates_per_window: u32,
509 max_coordination_per_window: u32,
510 rate_window: Duration,
511 ) -> Self {
512 Self {
513 candidate_rate_tracker: VecDeque::new(),
514 max_candidates_per_window,
515 rate_window,
516 coordination_requests: VecDeque::new(),
517 max_coordination_per_window,
518 address_validation_cache: HashMap::new(),
519 validation_cache_timeout: Duration::from_secs(300),
520 }
521 }
522
523 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
528 self.cleanup_rate_tracker(now);
530 self.cleanup_coordination_tracker(now);
531
532 let _current_candidate_rate =
534 self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
535 let _current_coordination_rate =
536 self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
537
538 let peer_reputation = self.calculate_peer_reputation(peer_id);
540 let adaptive_candidate_limit =
541 (self.max_candidates_per_window as f64 * peer_reputation) as u32;
542 let adaptive_coordination_limit =
543 (self.max_coordination_per_window as f64 * peer_reputation) as u32;
544
545 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
547 debug!(
548 "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
549 hex::encode(&peer_id[..8]),
550 self.candidate_rate_tracker.len(),
551 adaptive_candidate_limit
552 );
553 return true;
554 }
555
556 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
557 debug!(
558 "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
559 hex::encode(&peer_id[..8]),
560 self.coordination_requests.len(),
561 adaptive_coordination_limit
562 );
563 return true;
564 }
565
566 false
567 }
568
569 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
574 1.0
584 }
585
586 fn validate_amplification_limits(
591 &mut self,
592 source_addr: SocketAddr,
593 target_addr: SocketAddr,
594 now: Instant,
595 ) -> Result<(), NatTraversalError> {
596 let amplification_key = (source_addr, target_addr);
598
599 if self.is_amplification_suspicious(amplification_key, now) {
608 warn!(
609 "Potential amplification attack detected: {} -> {}",
610 source_addr, target_addr
611 );
612 return Err(NatTraversalError::SuspiciousCoordination);
613 }
614
615 Ok(())
616 }
617
618 fn is_amplification_suspicious(
620 &self,
621 _amplification_key: (SocketAddr, SocketAddr),
622 _now: Instant,
623 ) -> bool {
624 false
634 }
635
636 #[allow(dead_code)] fn generate_secure_coordination_round(&self) -> VarInt {
642 let secure_random: u64 = rand::random();
644
645 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
649 }
650
651 fn enhanced_address_validation(
659 &mut self,
660 addr: SocketAddr,
661 source_addr: SocketAddr,
662 now: Instant,
663 ) -> Result<AddressValidationResult, NatTraversalError> {
664 let basic_result = self.validate_address(addr, now);
666
667 match basic_result {
668 AddressValidationResult::Invalid => {
669 return Err(NatTraversalError::InvalidAddress);
670 }
671 AddressValidationResult::Suspicious => {
672 return Err(NatTraversalError::SuspiciousCoordination);
673 }
674 AddressValidationResult::Valid => {
675 }
677 }
678
679 self.validate_amplification_limits(source_addr, addr, now)?;
681
682 if self.is_address_in_suspicious_range(addr) {
684 warn!("Address in suspicious range detected: {}", addr);
685 return Err(NatTraversalError::SuspiciousCoordination);
686 }
687
688 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
689 warn!(
690 "Suspicious coordination pattern detected: {} -> {}",
691 source_addr, addr
692 );
693 return Err(NatTraversalError::SuspiciousCoordination);
694 }
695
696 Ok(AddressValidationResult::Valid)
697 }
698
699 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
701 match addr.ip() {
702 IpAddr::V4(ipv4) => {
703 let octets = ipv4.octets();
705
706 if octets[0] == 0 || octets[0] == 127 {
708 return true;
709 }
710
711 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
713 return true;
714 }
715 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
716 return true;
717 }
718 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
719 return true;
720 }
721
722 false
723 }
724 IpAddr::V6(ipv6) => {
725 if ipv6.is_loopback() || ipv6.is_unspecified() {
727 return true;
728 }
729
730 let segments = ipv6.segments();
732 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
733 return true;
734 }
735
736 false
737 }
738 }
739 }
740
741 fn is_coordination_pattern_suspicious(
743 &self,
744 _source_addr: SocketAddr,
745 _target_addr: SocketAddr,
746 _now: Instant,
747 ) -> bool {
748 false
758 }
759
760 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
762 self.cleanup_rate_tracker(now);
764
765 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
767 return true;
768 }
769
770 self.candidate_rate_tracker.push_back(now);
772 false
773 }
774
775 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
777 self.cleanup_coordination_tracker(now);
779
780 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
782 return true;
783 }
784
785 let request = CoordinationRequest { timestamp: now };
787 self.coordination_requests.push_back(request);
788 false
789 }
790
791 fn cleanup_rate_tracker(&mut self, now: Instant) {
793 let cutoff = now - self.rate_window;
794 while let Some(&front_time) = self.candidate_rate_tracker.front() {
795 if front_time < cutoff {
796 self.candidate_rate_tracker.pop_front();
797 } else {
798 break;
799 }
800 }
801 }
802
803 fn cleanup_coordination_tracker(&mut self, now: Instant) {
805 let cutoff = now - self.rate_window;
806 while let Some(front_request) = self.coordination_requests.front() {
807 if front_request.timestamp < cutoff {
808 self.coordination_requests.pop_front();
809 } else {
810 break;
811 }
812 }
813 }
814
815 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
817 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
819 return cached_result;
820 }
821
822 let result = self.perform_address_validation(addr);
823
824 self.address_validation_cache.insert(addr, result);
826
827 if self.address_validation_cache.len() > 1000 {
829 self.cleanup_address_cache(now);
830 }
831
832 result
833 }
834
835 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
837 match addr.ip() {
838 IpAddr::V4(ipv4) => {
839 if ipv4.is_unspecified() || ipv4.is_broadcast() {
841 return AddressValidationResult::Invalid;
842 }
843
844 if ipv4.is_multicast() || ipv4.is_documentation() {
846 return AddressValidationResult::Suspicious;
847 }
848
849 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
851 return AddressValidationResult::Invalid;
852 }
853
854 if self.is_suspicious_ipv4(ipv4) {
856 return AddressValidationResult::Suspicious;
857 }
858 }
859 IpAddr::V6(ipv6) => {
860 if ipv6.is_unspecified() || ipv6.is_multicast() {
862 return AddressValidationResult::Invalid;
863 }
864
865 if self.is_suspicious_ipv6(ipv6) {
867 return AddressValidationResult::Suspicious;
868 }
869 }
870 }
871
872 if addr.port() == 0 || addr.port() < 1024 {
874 return AddressValidationResult::Suspicious;
875 }
876
877 AddressValidationResult::Valid
878 }
879
880 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
882 let octets = ipv4.octets();
883
884 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
887 return true;
888 }
889
890 false
893 }
894
895 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
897 let segments = ipv6.segments();
898
899 if segments.iter().all(|&s| s == segments[0]) {
901 return true;
902 }
903
904 false
905 }
906
907 fn cleanup_address_cache(&mut self, _now: Instant) {
909 if self.address_validation_cache.len() > 500 {
912 let keys_to_remove: Vec<_> = self
913 .address_validation_cache
914 .keys()
915 .take(self.address_validation_cache.len() / 2)
916 .copied()
917 .collect();
918
919 for key in keys_to_remove {
920 self.address_validation_cache.remove(&key);
921 }
922 }
923 }
924
925 fn validate_punch_me_now_frame(
933 &mut self,
934 frame: &crate::frame::PunchMeNow,
935 source_addr: SocketAddr,
936 peer_id: [u8; 32],
937 now: Instant,
938 ) -> Result<(), NatTraversalError> {
939 if self.is_coordination_rate_limited(now) {
941 debug!(
942 "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
943 hex::encode(&peer_id[..8])
944 );
945 return Err(NatTraversalError::RateLimitExceeded);
946 }
947
948 let addr_validation = self.validate_address(frame.local_address, now);
950 match addr_validation {
951 AddressValidationResult::Invalid => {
952 debug!(
953 "PUNCH_ME_NOW frame rejected: invalid local_address {:?} from peer {:?}",
954 frame.local_address,
955 hex::encode(&peer_id[..8])
956 );
957 return Err(NatTraversalError::InvalidAddress);
958 }
959 AddressValidationResult::Suspicious => {
960 debug!(
961 "PUNCH_ME_NOW frame rejected: suspicious local_address {:?} from peer {:?}",
962 frame.local_address,
963 hex::encode(&peer_id[..8])
964 );
965 return Err(NatTraversalError::SuspiciousCoordination);
966 }
967 AddressValidationResult::Valid => {
968 }
970 }
971
972 if !self.validate_address_consistency(frame.local_address, source_addr) {
975 debug!(
976 "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
977 frame.local_address, source_addr
978 );
979 return Err(NatTraversalError::SuspiciousCoordination);
980 }
981
982 if !self.validate_coordination_parameters(frame) {
984 debug!(
985 "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
986 hex::encode(&peer_id[..8])
987 );
988 return Err(NatTraversalError::SuspiciousCoordination);
989 }
990
991 if let Some(target_peer_id) = frame.target_peer_id {
993 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
994 debug!(
995 "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
996 hex::encode(&peer_id[..8]),
997 hex::encode(&target_peer_id[..8])
998 );
999 return Err(NatTraversalError::SuspiciousCoordination);
1000 }
1001 }
1002
1003 if !self.validate_resource_limits(frame) {
1005 debug!(
1006 "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
1007 hex::encode(&peer_id[..8])
1008 );
1009 return Err(NatTraversalError::ResourceLimitExceeded);
1010 }
1011
1012 debug!(
1013 "PUNCH_ME_NOW frame validation passed for peer {:?}",
1014 hex::encode(&peer_id[..8])
1015 );
1016 Ok(())
1017 }
1018
1019 fn validate_address_consistency(
1024 &self,
1025 claimed_addr: SocketAddr,
1026 observed_addr: SocketAddr,
1027 ) -> bool {
1028 match (claimed_addr.ip(), observed_addr.ip()) {
1033 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
1034 if claimed_ip == observed_ip {
1036 return true;
1037 }
1038
1039 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
1041 return true;
1042 }
1043
1044 !claimed_ip.is_private() && !observed_ip.is_private()
1047 }
1048 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
1049 claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
1051 }
1052 _ => {
1053 false
1055 }
1056 }
1057 }
1058
1059 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1061 let ip1_octets = ip1.octets();
1063 let ip2_octets = ip2.octets();
1064
1065 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1067 return true;
1068 }
1069
1070 if ip1_octets[0] == 172
1072 && ip2_octets[0] == 172
1073 && (16..=31).contains(&ip1_octets[1])
1074 && (16..=31).contains(&ip2_octets[1])
1075 {
1076 return true;
1077 }
1078
1079 if ip1_octets[0] == 192
1081 && ip1_octets[1] == 168
1082 && ip2_octets[0] == 192
1083 && ip2_octets[1] == 168
1084 {
1085 return true;
1086 }
1087
1088 false
1089 }
1090
1091 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1093 let segments1 = ip1.segments();
1095 let segments2 = ip2.segments();
1096
1097 segments1[0] == segments2[0]
1098 && segments1[1] == segments2[1]
1099 && segments1[2] == segments2[2]
1100 && segments1[3] == segments2[3]
1101 }
1102
1103 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1105 if frame.round.into_inner() > 1000000 {
1107 return false;
1108 }
1109
1110 if frame.target_sequence.into_inner() > 10000 {
1112 return false;
1113 }
1114
1115 match frame.local_address.ip() {
1117 IpAddr::V4(ipv4) => {
1118 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1120 }
1121 IpAddr::V6(ipv6) => {
1122 !ipv6.is_unspecified() && !ipv6.is_multicast()
1124 }
1125 }
1126 }
1127
1128 fn validate_target_peer_request(
1130 &self,
1131 requesting_peer: [u8; 32],
1132 target_peer: [u8; 32],
1133 _frame: &crate::frame::PunchMeNow,
1134 ) -> bool {
1135 if requesting_peer == target_peer {
1137 return false;
1138 }
1139
1140 true
1146 }
1147
1148 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1150 self.coordination_requests.len() < self.max_coordination_per_window as usize
1159 }
1160}
1161
1162impl AdaptiveTimeoutState {
1163 pub(crate) fn new() -> Self {
1165 let base_timeout = Duration::from_millis(1000); Self {
1167 current_timeout: base_timeout,
1168 min_timeout: Duration::from_millis(100),
1169 max_timeout: Duration::from_secs(30),
1170 base_timeout,
1171 backoff_multiplier: 1.0,
1172 max_backoff_multiplier: 8.0,
1173 jitter_factor: 0.1, srtt: None,
1175 rttvar: None,
1176 last_rtt: None,
1177 consecutive_timeouts: 0,
1178 successful_responses: 0,
1179 }
1180 }
1181
1182 fn update_success(&mut self, rtt: Duration) {
1184 self.last_rtt = Some(rtt);
1185 self.successful_responses += 1;
1186 self.consecutive_timeouts = 0;
1187
1188 match self.srtt {
1190 None => {
1191 self.srtt = Some(rtt);
1192 self.rttvar = Some(rtt / 2);
1193 }
1194 Some(srtt) => {
1195 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1196 let abs_diff = if rtt > srtt { rtt - srtt } else { srtt - rtt };
1197
1198 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1199 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1200 }
1201 }
1202
1203 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1205
1206 self.calculate_current_timeout();
1208 }
1209
1210 fn update_timeout(&mut self) {
1212 self.consecutive_timeouts += 1;
1213
1214 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1216
1217 self.calculate_current_timeout();
1219 }
1220
1221 fn calculate_current_timeout(&mut self) {
1223 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1224 srtt + rttvar * 4
1226 } else {
1227 self.base_timeout
1228 };
1229
1230 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1232
1233 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1235 let timeout = timeout.mul_f64(jitter);
1236
1237 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1239 }
1240
1241 fn get_timeout(&self) -> Duration {
1243 self.current_timeout
1244 }
1245
1246 fn should_retry(&self, max_retries: u32) -> bool {
1248 self.consecutive_timeouts < max_retries
1249 }
1250
1251 fn get_retry_delay(&self) -> Duration {
1253 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1254 delay.clamp(self.min_timeout, self.max_timeout)
1255 }
1256}
1257
1258#[derive(Debug)]
1260pub(super) struct ResourceManagementConfig {
1261 max_active_validations: usize,
1263 max_local_candidates: usize,
1265 max_remote_candidates: usize,
1267 max_candidate_pairs: usize,
1269 #[allow(dead_code)] max_coordination_history: usize,
1272 cleanup_interval: Duration,
1274 candidate_timeout: Duration,
1276 validation_timeout: Duration,
1278 coordination_timeout: Duration,
1280 memory_pressure_threshold: f64,
1282 aggressive_cleanup_threshold: f64,
1284}
1285
1286#[derive(Debug, Default)]
1288pub(super) struct ResourceStats {
1289 active_validations: usize,
1291 local_candidates: usize,
1293 remote_candidates: usize,
1295 candidate_pairs: usize,
1297 peak_memory_usage: usize,
1299 cleanup_operations: u64,
1301 resources_cleaned: u64,
1303 allocation_failures: u64,
1305 #[allow(dead_code)] last_cleanup: Option<Instant>,
1308 memory_pressure: f64,
1310}
1311
1312#[derive(Debug)]
1314pub(super) struct ResourceCleanupCoordinator {
1315 config: ResourceManagementConfig,
1317 stats: ResourceStats,
1319 last_cleanup: Option<Instant>,
1321 cleanup_counter: u64,
1323 shutdown_requested: bool,
1325}
1326
1327impl ResourceManagementConfig {
1328 fn new() -> Self {
1330 Self {
1331 max_active_validations: 100,
1332 max_local_candidates: 50,
1333 max_remote_candidates: 100,
1334 max_candidate_pairs: 200,
1335 max_coordination_history: 10,
1336 cleanup_interval: Duration::from_secs(30),
1337 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1339 coordination_timeout: Duration::from_secs(60),
1340 memory_pressure_threshold: 0.75,
1341 aggressive_cleanup_threshold: 0.90,
1342 }
1343 }
1344
1345 #[allow(dead_code)] fn low_memory() -> Self {
1348 Self {
1349 max_active_validations: 25,
1350 max_local_candidates: 10,
1351 max_remote_candidates: 25,
1352 max_candidate_pairs: 50,
1353 max_coordination_history: 3,
1354 cleanup_interval: Duration::from_secs(15),
1355 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1357 coordination_timeout: Duration::from_secs(30),
1358 memory_pressure_threshold: 0.60,
1359 aggressive_cleanup_threshold: 0.80,
1360 }
1361 }
1362}
1363
1364impl ResourceCleanupCoordinator {
1365 fn new() -> Self {
1367 Self {
1368 config: ResourceManagementConfig::new(),
1369 stats: ResourceStats::default(),
1370 last_cleanup: None,
1371 cleanup_counter: 0,
1372 shutdown_requested: false,
1373 }
1374 }
1375
1376 #[allow(dead_code)] fn low_memory() -> Self {
1379 Self {
1380 config: ResourceManagementConfig::low_memory(),
1381 stats: ResourceStats::default(),
1382 last_cleanup: None,
1383 cleanup_counter: 0,
1384 shutdown_requested: false,
1385 }
1386 }
1387
1388 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1390 state.active_validations.len() > self.config.max_active_validations
1391 || state.local_candidates.len() > self.config.max_local_candidates
1392 || state.remote_candidates.len() > self.config.max_remote_candidates
1393 || state.candidate_pairs.len() > self.config.max_candidate_pairs
1394 }
1395
1396 fn calculate_memory_pressure(
1398 &mut self,
1399 active_validations_len: usize,
1400 local_candidates_len: usize,
1401 remote_candidates_len: usize,
1402 candidate_pairs_len: usize,
1403 ) -> f64 {
1404 let total_limit = self.config.max_active_validations
1405 + self.config.max_local_candidates
1406 + self.config.max_remote_candidates
1407 + self.config.max_candidate_pairs;
1408
1409 let current_usage = active_validations_len
1410 + local_candidates_len
1411 + remote_candidates_len
1412 + candidate_pairs_len;
1413
1414 let pressure = current_usage as f64 / total_limit as f64;
1415 self.stats.memory_pressure = pressure;
1416 pressure
1417 }
1418
1419 fn should_cleanup(&self, now: Instant) -> bool {
1421 if self.shutdown_requested {
1422 return true;
1423 }
1424
1425 if let Some(last_cleanup) = self.last_cleanup {
1427 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1428 return true;
1429 }
1430 } else {
1431 return true; }
1433
1434 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1436 return true;
1437 }
1438
1439 false
1440 }
1441
1442 #[allow(dead_code)] fn cleanup_expired_resources(
1445 &mut self,
1446 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1447 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1448 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1449 candidate_pairs: &mut Vec<CandidatePair>,
1450 coordination: &mut Option<CoordinationState>,
1451 now: Instant,
1452 ) -> u64 {
1453 let mut cleaned = 0;
1454
1455 cleaned += self.cleanup_expired_validations(active_validations, now);
1457
1458 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1460
1461 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1463
1464 cleaned += self.cleanup_old_coordination(coordination, now);
1466
1467 self.stats.cleanup_operations += 1;
1469 self.stats.resources_cleaned += cleaned;
1470 self.last_cleanup = Some(now);
1471 self.cleanup_counter += 1;
1472
1473 debug!("Cleaned up {} expired resources", cleaned);
1474 cleaned
1475 }
1476
1477 #[allow(dead_code)] fn cleanup_expired_validations(
1480 &mut self,
1481 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1482 now: Instant,
1483 ) -> u64 {
1484 let mut cleaned = 0;
1485 let validation_timeout = self.config.validation_timeout;
1486
1487 active_validations.retain(|_addr, validation| {
1488 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1489 if is_expired {
1490 cleaned += 1;
1491 trace!("Cleaned up expired validation for {:?}", _addr);
1492 }
1493 !is_expired
1494 });
1495
1496 cleaned
1497 }
1498
1499 #[allow(dead_code)] fn cleanup_stale_candidates(
1502 &mut self,
1503 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1504 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1505 now: Instant,
1506 ) -> u64 {
1507 let mut cleaned = 0;
1508 let candidate_timeout = self.config.candidate_timeout;
1509
1510 local_candidates.retain(|_seq, candidate| {
1512 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1513 || candidate.state == CandidateState::Failed
1514 || candidate.state == CandidateState::Removed;
1515 if is_stale {
1516 cleaned += 1;
1517 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1518 }
1519 !is_stale
1520 });
1521
1522 remote_candidates.retain(|_seq, candidate| {
1524 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1525 || candidate.state == CandidateState::Failed
1526 || candidate.state == CandidateState::Removed;
1527 if is_stale {
1528 cleaned += 1;
1529 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1530 }
1531 !is_stale
1532 });
1533
1534 cleaned
1535 }
1536
1537 #[allow(dead_code)] fn cleanup_failed_pairs(
1540 &mut self,
1541 candidate_pairs: &mut Vec<CandidatePair>,
1542 now: Instant,
1543 ) -> u64 {
1544 let mut cleaned = 0;
1545 let pair_timeout = self.config.candidate_timeout;
1546
1547 candidate_pairs.retain(|pair| {
1548 let is_stale = now.duration_since(pair.created_at) > pair_timeout
1549 || pair.state == PairState::Failed;
1550 if is_stale {
1551 cleaned += 1;
1552 trace!(
1553 "Cleaned up failed candidate pair {:?} -> {:?}",
1554 pair.local_addr, pair.remote_addr
1555 );
1556 }
1557 !is_stale
1558 });
1559
1560 cleaned
1561 }
1562
1563 #[allow(dead_code)] fn cleanup_old_coordination(
1566 &mut self,
1567 coordination: &mut Option<CoordinationState>,
1568 now: Instant,
1569 ) -> u64 {
1570 let mut cleaned = 0;
1571
1572 if let Some(coord) = coordination {
1573 let is_expired =
1574 now.duration_since(coord.round_start) > self.config.coordination_timeout;
1575 let is_failed = coord.state == CoordinationPhase::Failed;
1576
1577 if is_expired || is_failed {
1578 let round = coord.round;
1579 *coordination = None;
1580 cleaned += 1;
1581 trace!("Cleaned up old coordination state for round {}", round);
1582 }
1583 }
1584
1585 cleaned
1586 }
1587
1588 #[allow(dead_code)] fn aggressive_cleanup(
1591 &mut self,
1592 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1593 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1594 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1595 candidate_pairs: &mut Vec<CandidatePair>,
1596 now: Instant,
1597 ) -> u64 {
1598 let mut cleaned = 0;
1599
1600 let aggressive_timeout = self.config.candidate_timeout / 2;
1602
1603 local_candidates.retain(|_seq, candidate| {
1605 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1606 && candidate.state != CandidateState::Failed;
1607 if !keep {
1608 cleaned += 1;
1609 }
1610 keep
1611 });
1612
1613 remote_candidates.retain(|_seq, candidate| {
1614 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1615 && candidate.state != CandidateState::Failed;
1616 if !keep {
1617 cleaned += 1;
1618 }
1619 keep
1620 });
1621
1622 candidate_pairs.retain(|pair| {
1624 let keep = pair.state != PairState::Waiting
1625 || now.duration_since(pair.created_at) <= aggressive_timeout;
1626 if !keep {
1627 cleaned += 1;
1628 }
1629 keep
1630 });
1631
1632 active_validations.retain(|_addr, validation| {
1634 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1635 if !keep {
1636 cleaned += 1;
1637 }
1638 keep
1639 });
1640
1641 warn!(
1642 "Aggressive cleanup removed {} resources due to memory pressure",
1643 cleaned
1644 );
1645 cleaned
1646 }
1647
1648 #[allow(dead_code)] fn request_shutdown(&mut self) {
1651 self.shutdown_requested = true;
1652 debug!("Resource cleanup coordinator shutdown requested");
1653 }
1654
1655 #[allow(dead_code)] fn shutdown_cleanup(
1658 &mut self,
1659 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1660 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1661 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1662 candidate_pairs: &mut Vec<CandidatePair>,
1663 coordination: &mut Option<CoordinationState>,
1664 ) -> u64 {
1665 let mut cleaned = 0;
1666
1667 cleaned += active_validations.len() as u64;
1669 active_validations.clear();
1670
1671 cleaned += local_candidates.len() as u64;
1672 local_candidates.clear();
1673
1674 cleaned += remote_candidates.len() as u64;
1675 remote_candidates.clear();
1676
1677 cleaned += candidate_pairs.len() as u64;
1678 candidate_pairs.clear();
1679
1680 if coordination.is_some() {
1681 *coordination = None;
1682 cleaned += 1;
1683 }
1684
1685 info!("Shutdown cleanup removed {} resources", cleaned);
1686 cleaned
1687 }
1688
1689 #[allow(dead_code)] fn get_resource_stats(&self) -> &ResourceStats {
1692 &self.stats
1693 }
1694
1695 fn update_stats(
1697 &mut self,
1698 active_validations_len: usize,
1699 local_candidates_len: usize,
1700 remote_candidates_len: usize,
1701 candidate_pairs_len: usize,
1702 ) {
1703 self.stats.active_validations = active_validations_len;
1704 self.stats.local_candidates = local_candidates_len;
1705 self.stats.remote_candidates = remote_candidates_len;
1706 self.stats.candidate_pairs = candidate_pairs_len;
1707
1708 let current_usage = self.stats.active_validations
1710 + self.stats.local_candidates
1711 + self.stats.remote_candidates
1712 + self.stats.candidate_pairs;
1713
1714 if current_usage > self.stats.peak_memory_usage {
1715 self.stats.peak_memory_usage = current_usage;
1716 }
1717 }
1718
1719 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1721 self.last_cleanup = Some(now);
1722 self.cleanup_counter += 1;
1723
1724 self.stats.cleanup_operations += 1;
1726
1727 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1728 }
1729}
1730
1731impl NetworkConditionMonitor {
1732 fn new() -> Self {
1734 Self {
1735 rtt_samples: VecDeque::new(),
1736 max_samples: 20,
1737 packet_loss_rate: 0.0,
1738 congestion_window: 10,
1739 quality_score: 0.8, last_quality_update: Instant::now(),
1741 quality_update_interval: Duration::from_secs(10),
1742 timeout_stats: TimeoutStatistics::default(),
1743 }
1744 }
1745
1746 fn record_success(&mut self, rtt: Duration, now: Instant) {
1748 self.rtt_samples.push_back(rtt);
1750 if self.rtt_samples.len() > self.max_samples {
1751 self.rtt_samples.pop_front();
1752 }
1753
1754 self.timeout_stats.total_responses += 1;
1756 self.update_timeout_stats(now);
1757
1758 self.update_quality_score(now);
1760 }
1761
1762 fn record_timeout(&mut self, now: Instant) {
1764 self.timeout_stats.total_timeouts += 1;
1765 self.update_timeout_stats(now);
1766
1767 self.update_quality_score(now);
1769 }
1770
1771 fn update_timeout_stats(&mut self, now: Instant) {
1773 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1774
1775 if total_attempts > 0 {
1776 self.timeout_stats.timeout_rate =
1777 self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1778 }
1779
1780 if !self.rtt_samples.is_empty() {
1782 let total_rtt: Duration = self.rtt_samples.iter().sum();
1783 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1784 }
1785
1786 self.timeout_stats.last_update = Some(now);
1787 }
1788
1789 fn update_quality_score(&mut self, now: Instant) {
1791 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1792 return;
1793 }
1794
1795 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1797 let rtt_factor = self.calculate_rtt_factor();
1798 let consistency_factor = self.calculate_consistency_factor();
1799
1800 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1802
1803 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1805 self.last_quality_update = now;
1806 }
1807
1808 fn calculate_rtt_factor(&self) -> f64 {
1810 if self.rtt_samples.is_empty() {
1811 return 0.5; }
1813
1814 let avg_rtt = self.timeout_stats.avg_response_time;
1815
1816 let rtt_ms = avg_rtt.as_millis() as f64;
1818 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1819 factor.clamp(0.0, 1.0)
1820 }
1821
1822 fn calculate_consistency_factor(&self) -> f64 {
1824 if self.rtt_samples.len() < 3 {
1825 return 0.5; }
1827
1828 let mean_rtt = self.timeout_stats.avg_response_time;
1830 let variance: f64 = self
1831 .rtt_samples
1832 .iter()
1833 .map(|rtt| {
1834 let diff = if *rtt > mean_rtt {
1835 *rtt - mean_rtt
1836 } else {
1837 mean_rtt - *rtt
1838 };
1839 diff.as_millis() as f64
1840 })
1841 .map(|diff| diff * diff)
1842 .sum::<f64>()
1843 / self.rtt_samples.len() as f64;
1844
1845 let std_dev = variance.sqrt();
1846
1847 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1849 consistency.clamp(0.0, 1.0)
1850 }
1851
1852 fn get_quality_score(&self) -> f64 {
1854 self.quality_score
1855 }
1856
1857 fn get_estimated_rtt(&self) -> Option<Duration> {
1859 if self.rtt_samples.is_empty() {
1860 return None;
1861 }
1862
1863 Some(self.timeout_stats.avg_response_time)
1864 }
1865
1866 fn is_suitable_for_coordination(&self) -> bool {
1868 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1870 }
1871
1872 #[allow(dead_code)] fn get_packet_loss_rate(&self) -> f64 {
1875 self.packet_loss_rate
1876 }
1877
1878 #[allow(dead_code)] fn get_timeout_multiplier(&self) -> f64 {
1881 let base_multiplier = 1.0;
1882
1883 let quality_multiplier = if self.quality_score < 0.3 {
1885 2.0 } else if self.quality_score > 0.8 {
1887 0.8 } else {
1889 1.0 };
1891
1892 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1894
1895 base_multiplier * quality_multiplier * loss_multiplier
1896 }
1897
1898 #[allow(dead_code)] fn cleanup(&mut self, now: Instant) {
1901 let _cutoff_time = now - Duration::from_secs(60);
1903
1904 if let Some(last_update) = self.timeout_stats.last_update {
1906 if now.duration_since(last_update) > Duration::from_secs(300) {
1907 self.timeout_stats = TimeoutStatistics::default();
1908 }
1909 }
1910 }
1911}
1912
1913impl NatTraversalState {
1914 pub(super) fn new(
1916 role: NatTraversalRole,
1917 max_candidates: u32,
1918 coordination_timeout: Duration,
1919 ) -> Self {
1920 let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1921 Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1922 } else {
1923 None
1924 };
1925
1926 Self {
1927 role,
1928 local_candidates: HashMap::new(),
1929 remote_candidates: HashMap::new(),
1930 candidate_pairs: Vec::new(),
1931 pair_index: HashMap::new(),
1932 active_validations: HashMap::new(),
1933 coordination: None,
1934 next_sequence: VarInt::from_u32(1),
1935 max_candidates,
1936 coordination_timeout,
1937 stats: NatTraversalStats::default(),
1938 security_state: SecurityValidationState::new(),
1939 network_monitor: NetworkConditionMonitor::new(),
1940 resource_manager: ResourceCleanupCoordinator::new(),
1941 bootstrap_coordinator,
1942 multi_dest_transmitter: MultiDestinationTransmitter::new(),
1943 }
1944 }
1945
1946 pub(super) fn add_remote_candidate(
1948 &mut self,
1949 sequence: VarInt,
1950 address: SocketAddr,
1951 priority: VarInt,
1952 now: Instant,
1953 ) -> Result<(), NatTraversalError> {
1954 if self.should_reject_new_resources(now) {
1956 debug!(
1957 "Rejecting new candidate due to resource limits: {}",
1958 address
1959 );
1960 return Err(NatTraversalError::ResourceLimitExceeded);
1961 }
1962
1963 if self.security_state.is_candidate_rate_limited(now) {
1965 self.stats.rate_limit_violations += 1;
1966 debug!("Rate limit exceeded for candidate addition: {}", address);
1967 return Err(NatTraversalError::RateLimitExceeded);
1968 }
1969
1970 match self.security_state.validate_address(address, now) {
1972 AddressValidationResult::Invalid => {
1973 self.stats.invalid_address_rejections += 1;
1974 self.stats.security_rejections += 1;
1975 debug!("Invalid address rejected: {}", address);
1976 return Err(NatTraversalError::InvalidAddress);
1977 }
1978 AddressValidationResult::Suspicious => {
1979 self.stats.security_rejections += 1;
1980 debug!("Suspicious address rejected: {}", address);
1981 return Err(NatTraversalError::SecurityValidationFailed);
1982 }
1983 AddressValidationResult::Valid => {
1984 }
1986 }
1987
1988 if self.remote_candidates.len() >= self.max_candidates as usize {
1990 return Err(NatTraversalError::TooManyCandidates);
1991 }
1992
1993 if self
1995 .remote_candidates
1996 .values()
1997 .any(|c| c.address == address && c.state != CandidateState::Removed)
1998 {
1999 return Err(NatTraversalError::DuplicateAddress);
2000 }
2001
2002 let candidate = AddressCandidate {
2003 address,
2004 priority: priority.into_inner() as u32,
2005 source: CandidateSource::Peer,
2006 discovered_at: now,
2007 state: CandidateState::New,
2008 attempt_count: 0,
2009 last_attempt: None,
2010 };
2011
2012 self.remote_candidates.insert(sequence, candidate);
2013 self.stats.remote_candidates_received += 1;
2014
2015 trace!(
2016 "Added remote candidate: {} with priority {}",
2017 address, priority
2018 );
2019 Ok(())
2020 }
2021
2022 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
2024 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
2025 candidate.state = CandidateState::Removed;
2026
2027 self.active_validations.remove(&candidate.address);
2029 true
2030 } else {
2031 false
2032 }
2033 }
2034
2035 #[allow(dead_code)] pub(super) fn add_local_candidate(
2038 &mut self,
2039 address: SocketAddr,
2040 source: CandidateSource,
2041 now: Instant,
2042 ) -> VarInt {
2043 let sequence = self.next_sequence;
2044 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2045 .expect("sequence number overflow");
2046
2047 let candidate_type = classify_candidate_type(source);
2049 let local_preference = self.calculate_local_preference(address);
2050 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
2051
2052 let candidate = AddressCandidate {
2053 address,
2054 priority,
2055 source,
2056 discovered_at: now,
2057 state: CandidateState::New,
2058 attempt_count: 0,
2059 last_attempt: None,
2060 };
2061
2062 self.local_candidates.insert(sequence, candidate);
2063 self.stats.local_candidates_sent += 1;
2064
2065 self.generate_candidate_pairs(now);
2067
2068 sequence
2069 }
2070
2071 #[allow(dead_code)] fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
2074 match addr {
2075 SocketAddr::V4(v4) => {
2076 if v4.ip().is_loopback() {
2077 0 } else if v4.ip().is_private() {
2079 65000 } else {
2081 32000 }
2083 }
2084 SocketAddr::V6(v6) => {
2085 if v6.ip().is_loopback() {
2086 0
2087 } else if v6.ip().is_unicast_link_local() {
2088 30000 } else {
2090 50000 }
2092 }
2093 }
2094 }
2095
2096 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
2098 self.candidate_pairs.clear();
2099 self.pair_index.clear();
2100
2101 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
2103 self.candidate_pairs.reserve(estimated_capacity);
2104 self.pair_index.reserve(estimated_capacity);
2105
2106 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
2108
2109 for (_local_seq, local_candidate) in &self.local_candidates {
2110 if local_candidate.state == CandidateState::Removed {
2112 continue;
2113 }
2114
2115 let local_type = classify_candidate_type(local_candidate.source);
2117
2118 for (remote_seq, remote_candidate) in &self.remote_candidates {
2119 if remote_candidate.state == CandidateState::Removed {
2121 continue;
2122 }
2123
2124 let cache_key = (local_candidate.address, remote_candidate.address);
2126 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2127 are_candidates_compatible(local_candidate, remote_candidate)
2128 });
2129
2130 if !compatible {
2131 continue;
2132 }
2133
2134 let pair_priority =
2136 calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
2137
2138 let remote_type = classify_candidate_type(remote_candidate.source);
2140 let pair_type = classify_pair_type(local_type, remote_type);
2141
2142 let pair = CandidatePair {
2143 remote_sequence: *remote_seq,
2144 local_addr: local_candidate.address,
2145 remote_addr: remote_candidate.address,
2146 priority: pair_priority,
2147 state: PairState::Waiting,
2148 pair_type,
2149 created_at: now,
2150 last_check: None,
2151 };
2152
2153 let index = self.candidate_pairs.len();
2155 self.pair_index.insert(remote_candidate.address, index);
2156 self.candidate_pairs.push(pair);
2157 }
2158 }
2159
2160 self.candidate_pairs
2162 .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2163
2164 self.pair_index.clear();
2166 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2167 self.pair_index.insert(pair.remote_addr, idx);
2168 }
2169
2170 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2171 }
2172
2173 pub(super) fn get_next_validation_pairs(
2175 &mut self,
2176 max_concurrent: usize,
2177 ) -> Vec<&mut CandidatePair> {
2178 let mut result = Vec::with_capacity(max_concurrent);
2181
2182 for pair in self.candidate_pairs.iter_mut() {
2183 if pair.state == PairState::Waiting {
2184 result.push(pair);
2185 if result.len() >= max_concurrent {
2186 break;
2187 }
2188 }
2189 }
2190
2191 result
2192 }
2193
2194 pub(super) fn find_pair_by_remote_addr(
2196 &mut self,
2197 addr: SocketAddr,
2198 ) -> Option<&mut CandidatePair> {
2199 if let Some(&index) = self.pair_index.get(&addr) {
2201 self.candidate_pairs.get_mut(index)
2202 } else {
2203 None
2204 }
2205 }
2206
2207 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2209 let (succeeded_type, succeeded_priority) = {
2211 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2212 pair.state = PairState::Succeeded;
2213 (pair.pair_type, pair.priority)
2214 } else {
2215 return false;
2216 }
2217 };
2218
2219 for other_pair in &mut self.candidate_pairs {
2221 if other_pair.pair_type == succeeded_type
2222 && other_pair.priority < succeeded_priority
2223 && other_pair.state == PairState::Waiting
2224 {
2225 other_pair.state = PairState::Frozen;
2226 }
2227 }
2228
2229 true
2230 }
2231
2232 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2234 let mut best_ipv4: Option<&CandidatePair> = None;
2235 let mut best_ipv6: Option<&CandidatePair> = None;
2236
2237 for pair in &self.candidate_pairs {
2238 if pair.state != PairState::Succeeded {
2239 continue;
2240 }
2241
2242 match pair.remote_addr {
2243 SocketAddr::V4(_) => {
2244 if best_ipv4.map_or(true, |best| pair.priority > best.priority) {
2245 best_ipv4 = Some(pair);
2246 }
2247 }
2248 SocketAddr::V6(_) => {
2249 if best_ipv6.map_or(true, |best| pair.priority > best.priority) {
2250 best_ipv6 = Some(pair);
2251 }
2252 }
2253 }
2254 }
2255
2256 let mut result = Vec::new();
2257 if let Some(pair) = best_ipv4 {
2258 result.push(pair);
2259 }
2260 if let Some(pair) = best_ipv6 {
2261 result.push(pair);
2262 }
2263 result
2264 }
2265
2266 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2268 let mut candidates: Vec<_> = self
2269 .remote_candidates
2270 .iter()
2271 .filter(|(_, c)| c.state == CandidateState::New)
2272 .map(|(k, v)| (*k, v))
2273 .collect();
2274
2275 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2277 candidates
2278 }
2279
2280 pub(super) fn start_validation(
2282 &mut self,
2283 sequence: VarInt,
2284 challenge: u64,
2285 now: Instant,
2286 ) -> Result<(), NatTraversalError> {
2287 let candidate = self
2288 .remote_candidates
2289 .get_mut(&sequence)
2290 .ok_or(NatTraversalError::UnknownCandidate)?;
2291
2292 if candidate.state != CandidateState::New {
2293 return Err(NatTraversalError::InvalidCandidateState);
2294 }
2295
2296 if Self::is_validation_suspicious(candidate, now) {
2298 self.stats.security_rejections += 1;
2299 debug!(
2300 "Suspicious validation attempt rejected for address {}",
2301 candidate.address
2302 );
2303 return Err(NatTraversalError::SecurityValidationFailed);
2304 }
2305
2306 if self.active_validations.len() >= 10 {
2308 debug!(
2309 "Too many concurrent validations, rejecting new validation for {}",
2310 candidate.address
2311 );
2312 return Err(NatTraversalError::SecurityValidationFailed);
2313 }
2314
2315 candidate.state = CandidateState::Validating;
2317 candidate.attempt_count += 1;
2318 candidate.last_attempt = Some(now);
2319
2320 let validation = PathValidationState {
2322 challenge,
2323 sent_at: now,
2324 retry_count: 0,
2325 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2327 timeout_state: AdaptiveTimeoutState::new(),
2328 last_retry_at: None,
2329 };
2330
2331 self.active_validations
2332 .insert(candidate.address, validation);
2333 trace!(
2334 "Started validation for candidate {} with challenge {}",
2335 candidate.address, challenge
2336 );
2337 Ok(())
2338 }
2339
2340 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2342 if candidate.attempt_count > 10 {
2344 return true;
2345 }
2346
2347 if let Some(last_attempt) = candidate.last_attempt {
2349 let time_since_last = now.duration_since(last_attempt);
2350 if time_since_last < Duration::from_millis(100) {
2351 return true; }
2353 }
2354
2355 if candidate.state == CandidateState::Failed {
2357 let time_since_discovery = now.duration_since(candidate.discovered_at);
2358 if time_since_discovery < Duration::from_secs(60) {
2359 return true; }
2361 }
2362
2363 false
2364 }
2365
2366 pub(super) fn handle_validation_success(
2368 &mut self,
2369 remote_addr: SocketAddr,
2370 challenge: u64,
2371 now: Instant,
2372 ) -> Result<VarInt, NatTraversalError> {
2373 let sequence = self
2375 .remote_candidates
2376 .iter()
2377 .find(|(_, c)| c.address == remote_addr)
2378 .map(|(seq, _)| *seq)
2379 .ok_or(NatTraversalError::UnknownCandidate)?;
2380
2381 let validation = self
2383 .active_validations
2384 .get_mut(&remote_addr)
2385 .ok_or(NatTraversalError::NoActiveValidation)?;
2386
2387 if validation.challenge != challenge {
2388 return Err(NatTraversalError::ChallengeMismatch);
2389 }
2390
2391 let rtt = now.duration_since(validation.sent_at);
2393 validation.timeout_state.update_success(rtt);
2394
2395 self.network_monitor.record_success(rtt, now);
2397
2398 let candidate = self
2400 .remote_candidates
2401 .get_mut(&sequence)
2402 .ok_or(NatTraversalError::UnknownCandidate)?;
2403
2404 candidate.state = CandidateState::Valid;
2405 self.active_validations.remove(&remote_addr);
2406 self.stats.validations_succeeded += 1;
2407
2408 trace!(
2409 "Validation successful for {} with RTT {:?}",
2410 remote_addr, rtt
2411 );
2412 Ok(sequence)
2413 }
2414
2415 pub(super) fn start_coordination_round(
2417 &mut self,
2418 targets: Vec<PunchTarget>,
2419 now: Instant,
2420 ) -> Result<VarInt, NatTraversalError> {
2421 if self.security_state.is_coordination_rate_limited(now) {
2423 self.stats.rate_limit_violations += 1;
2424 debug!(
2425 "Rate limit exceeded for coordination request with {} targets",
2426 targets.len()
2427 );
2428 return Err(NatTraversalError::RateLimitExceeded);
2429 }
2430
2431 if self.is_coordination_suspicious(&targets, now) {
2433 self.stats.suspicious_coordination_attempts += 1;
2434 self.stats.security_rejections += 1;
2435 debug!(
2436 "Suspicious coordination request rejected with {} targets",
2437 targets.len()
2438 );
2439 return Err(NatTraversalError::SuspiciousCoordination);
2440 }
2441
2442 for target in &targets {
2444 match self
2445 .security_state
2446 .validate_address(target.remote_addr, now)
2447 {
2448 AddressValidationResult::Invalid => {
2449 self.stats.invalid_address_rejections += 1;
2450 self.stats.security_rejections += 1;
2451 debug!(
2452 "Invalid target address in coordination: {}",
2453 target.remote_addr
2454 );
2455 return Err(NatTraversalError::InvalidAddress);
2456 }
2457 AddressValidationResult::Suspicious => {
2458 self.stats.security_rejections += 1;
2459 debug!(
2460 "Suspicious target address in coordination: {}",
2461 target.remote_addr
2462 );
2463 return Err(NatTraversalError::SecurityValidationFailed);
2464 }
2465 AddressValidationResult::Valid => {
2466 }
2468 }
2469 }
2470
2471 let round = self.next_sequence;
2472 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2473 .expect("sequence number overflow");
2474
2475 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2478
2479 self.coordination = Some(CoordinationState {
2480 round,
2481 punch_targets: targets,
2482 round_start: now,
2483 punch_start,
2484 round_duration: self.coordination_timeout,
2485 state: CoordinationPhase::Requesting,
2486 punch_request_sent: false,
2487 peer_punch_received: false,
2488 retry_count: 0,
2489 max_retries: 3,
2490 timeout_state: AdaptiveTimeoutState::new(),
2491 last_retry_at: None,
2492 });
2493
2494 self.stats.coordination_rounds += 1;
2495 trace!(
2496 "Started coordination round {} with {} targets",
2497 round,
2498 self.coordination.as_ref().unwrap().punch_targets.len()
2499 );
2500 Ok(round)
2501 }
2502
2503 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2505 if targets.len() > 20 {
2507 return true;
2508 }
2509
2510 let mut seen_addresses = std::collections::HashSet::new();
2512 for target in targets {
2513 if !seen_addresses.insert(target.remote_addr) {
2514 return true; }
2516 }
2517
2518 if targets.len() > 5 {
2520 let mut ipv4_addresses: Vec<_> = targets
2522 .iter()
2523 .filter_map(|t| match t.remote_addr.ip() {
2524 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2525 _ => None,
2526 })
2527 .collect();
2528
2529 if ipv4_addresses.len() >= 3 {
2530 ipv4_addresses.sort();
2531 let mut sequential_count = 1;
2532 for i in 1..ipv4_addresses.len() {
2533 if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2534 sequential_count += 1;
2535 if sequential_count >= 3 {
2536 return true; }
2538 } else {
2539 sequential_count = 1;
2540 }
2541 }
2542 }
2543 }
2544
2545 false
2546 }
2547
2548 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2550 self.coordination.as_ref().map(|c| c.state)
2551 }
2552
2553 pub(super) fn should_send_punch_request(&self) -> bool {
2555 if let Some(coord) = &self.coordination {
2556 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2557 } else {
2558 false
2559 }
2560 }
2561
2562 pub(super) fn mark_punch_request_sent(&mut self) {
2564 if let Some(coord) = &mut self.coordination {
2565 coord.punch_request_sent = true;
2566 coord.state = CoordinationPhase::Coordinating;
2567 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2568 }
2569 }
2570
2571 pub(super) fn handle_peer_punch_request(
2573 &mut self,
2574 peer_round: VarInt,
2575 now: Instant,
2576 ) -> Result<bool, NatTraversalError> {
2577 if self.is_peer_coordination_suspicious(peer_round, now) {
2579 self.stats.suspicious_coordination_attempts += 1;
2580 self.stats.security_rejections += 1;
2581 debug!(
2582 "Suspicious peer coordination request rejected for round {}",
2583 peer_round
2584 );
2585 return Err(NatTraversalError::SuspiciousCoordination);
2586 }
2587
2588 if let Some(coord) = &mut self.coordination {
2589 if coord.round == peer_round {
2590 match coord.state {
2591 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2592 coord.peer_punch_received = true;
2593 coord.state = CoordinationPhase::Preparing;
2594
2595 let network_rtt = self
2597 .network_monitor
2598 .get_estimated_rtt()
2599 .unwrap_or(Duration::from_millis(100));
2600 let quality_score = self.network_monitor.get_quality_score();
2601
2602 let base_grace = Duration::from_millis(150);
2604 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2605 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2606
2607 let adaptive_grace = Duration::from_millis(
2608 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2609 );
2610
2611 coord.punch_start = now + adaptive_grace;
2612
2613 trace!(
2614 "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2615 adaptive_grace, network_rtt, quality_score
2616 );
2617 Ok(true)
2618 }
2619 CoordinationPhase::Preparing => {
2620 trace!("Peer coordination confirmed during preparation");
2622 Ok(true)
2623 }
2624 _ => {
2625 debug!(
2626 "Received coordination in unexpected phase: {:?}",
2627 coord.state
2628 );
2629 Ok(false)
2630 }
2631 }
2632 } else {
2633 debug!(
2634 "Received coordination for wrong round: {} vs {}",
2635 peer_round, coord.round
2636 );
2637 Ok(false)
2638 }
2639 } else {
2640 debug!("Received peer coordination but no active round");
2641 Ok(false)
2642 }
2643 }
2644
2645 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2647 if peer_round.into_inner() == 0 {
2649 return true; }
2651
2652 if let Some(coord) = &self.coordination {
2654 let our_round = coord.round.into_inner();
2655 let peer_round_num = peer_round.into_inner();
2656
2657 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2659 return true;
2660 }
2661 }
2662
2663 false
2664 }
2665
2666 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2668 if let Some(coord) = &self.coordination {
2669 match coord.state {
2670 CoordinationPhase::Preparing => now >= coord.punch_start,
2671 CoordinationPhase::Coordinating => {
2672 coord.peer_punch_received && now >= coord.punch_start
2674 }
2675 _ => false,
2676 }
2677 } else {
2678 false
2679 }
2680 }
2681
2682 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2684 if let Some(coord) = &mut self.coordination {
2685 coord.state = CoordinationPhase::Punching;
2686
2687 let network_rtt = self
2689 .network_monitor
2690 .get_estimated_rtt()
2691 .unwrap_or(Duration::from_millis(100));
2692
2693 let jitter_ms: u64 = rand::random::<u64>() % 11;
2695 let jitter = Duration::from_millis(jitter_ms);
2696 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2697
2698 coord.punch_start = transmission_time.max(now);
2700
2701 trace!(
2702 "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2703 coord.punch_start, network_rtt, jitter
2704 );
2705 }
2706 }
2707
2708 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2710 self.coordination
2711 .as_ref()
2712 .map(|c| c.punch_targets.as_slice())
2713 }
2714
2715 pub(super) fn mark_coordination_validating(&mut self) {
2717 if let Some(coord) = &mut self.coordination {
2718 if coord.state == CoordinationPhase::Punching {
2719 coord.state = CoordinationPhase::Validating;
2720 trace!("Coordination moved to validation phase");
2721 }
2722 }
2723 }
2724
2725 pub(super) fn handle_coordination_success(
2727 &mut self,
2728 remote_addr: SocketAddr,
2729 now: Instant,
2730 ) -> bool {
2731 if let Some(coord) = &mut self.coordination {
2732 let was_target = coord
2734 .punch_targets
2735 .iter()
2736 .any(|target| target.remote_addr == remote_addr);
2737
2738 if was_target && coord.state == CoordinationPhase::Validating {
2739 let rtt = now.duration_since(coord.round_start);
2741 coord.timeout_state.update_success(rtt);
2742 self.network_monitor.record_success(rtt, now);
2743
2744 coord.state = CoordinationPhase::Succeeded;
2745 self.stats.direct_connections += 1;
2746 trace!(
2747 "Coordination succeeded via {} with RTT {:?}",
2748 remote_addr, rtt
2749 );
2750 true
2751 } else {
2752 false
2753 }
2754 } else {
2755 false
2756 }
2757 }
2758
2759 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2761 if let Some(coord) = &mut self.coordination {
2762 coord.retry_count += 1;
2763 coord.timeout_state.update_timeout();
2764 self.network_monitor.record_timeout(now);
2765
2766 if coord.timeout_state.should_retry(coord.max_retries)
2768 && self.network_monitor.is_suitable_for_coordination()
2769 {
2770 coord.state = CoordinationPhase::Requesting;
2772 coord.punch_request_sent = false;
2773 coord.peer_punch_received = false;
2774 coord.round_start = now;
2775 coord.last_retry_at = Some(now);
2776
2777 let retry_delay = coord.timeout_state.get_retry_delay();
2779
2780 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2782 let adjusted_delay = Duration::from_millis(
2783 (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2784 );
2785
2786 coord.punch_start = now + adjusted_delay;
2787
2788 trace!(
2789 "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2790 coord.round,
2791 coord.retry_count + 1,
2792 adjusted_delay,
2793 self.network_monitor.get_quality_score()
2794 );
2795 true
2796 } else {
2797 coord.state = CoordinationPhase::Failed;
2798 self.stats.coordination_failures += 1;
2799
2800 if !self.network_monitor.is_suitable_for_coordination() {
2801 trace!(
2802 "Coordination failed due to poor network conditions (quality: {:.2})",
2803 self.network_monitor.get_quality_score()
2804 );
2805 } else {
2806 trace!("Coordination failed after {} attempts", coord.retry_count);
2807 }
2808 false
2809 }
2810 } else {
2811 false
2812 }
2813 }
2814
2815 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2817 if let Some(coord) = &mut self.coordination {
2818 let timeout = coord.timeout_state.get_timeout();
2819 let elapsed = now.duration_since(coord.round_start);
2820
2821 if elapsed > timeout {
2822 trace!(
2823 "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2824 coord.round, elapsed, timeout
2825 );
2826 self.handle_coordination_failure(now);
2827 true
2828 } else {
2829 false
2830 }
2831 } else {
2832 false
2833 }
2834 }
2835
2836 #[allow(dead_code)] pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2839 let mut expired_validations = Vec::new();
2840 let mut retry_validations = Vec::new();
2841
2842 for (addr, validation) in &mut self.active_validations {
2843 let timeout = validation.timeout_state.get_timeout();
2844 let elapsed = now.duration_since(validation.sent_at);
2845
2846 if elapsed >= timeout {
2847 if validation
2848 .timeout_state
2849 .should_retry(validation.max_retries)
2850 {
2851 retry_validations.push(*addr);
2853 } else {
2854 expired_validations.push(*addr);
2856 }
2857 }
2858 }
2859
2860 for addr in retry_validations {
2862 if let Some(validation) = self.active_validations.get_mut(&addr) {
2863 validation.retry_count += 1;
2864 validation.sent_at = now;
2865 validation.last_retry_at = Some(now);
2866 validation.timeout_state.update_timeout();
2867
2868 trace!(
2869 "Retrying validation for {} (attempt {})",
2870 addr,
2871 validation.retry_count + 1
2872 );
2873 }
2874 }
2875
2876 for addr in &expired_validations {
2878 self.active_validations.remove(addr);
2879 self.network_monitor.record_timeout(now);
2880 trace!("Validation expired for {}", addr);
2881 }
2882
2883 expired_validations
2884 }
2885
2886 #[allow(dead_code)] pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2889 let mut retry_addresses = Vec::new();
2890
2891 for (addr, validation) in &mut self.active_validations {
2893 let elapsed = now.duration_since(validation.sent_at);
2894 let timeout = validation.timeout_state.get_timeout();
2895
2896 if elapsed > timeout
2897 && validation
2898 .timeout_state
2899 .should_retry(validation.max_retries)
2900 {
2901 validation.retry_count += 1;
2903 validation.last_retry_at = Some(now);
2904 validation.sent_at = now; validation.timeout_state.update_timeout();
2906
2907 retry_addresses.push(*addr);
2908 trace!(
2909 "Scheduled retry {} for validation to {}",
2910 validation.retry_count, addr
2911 );
2912 }
2913 }
2914
2915 retry_addresses
2916 }
2917
2918 #[allow(dead_code)] pub(super) fn update_network_conditions(&mut self, now: Instant) {
2921 self.network_monitor.cleanup(now);
2922
2923 let multiplier = self.network_monitor.get_timeout_multiplier();
2925
2926 for validation in self.active_validations.values_mut() {
2928 if multiplier > 1.5 {
2929 validation.timeout_state.backoff_multiplier =
2931 (validation.timeout_state.backoff_multiplier * 1.2)
2932 .min(validation.timeout_state.max_backoff_multiplier);
2933 } else if multiplier < 0.8 {
2934 validation.timeout_state.backoff_multiplier =
2936 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2937 }
2938 }
2939 }
2940
2941 #[allow(dead_code)] pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2944 if let Some(coord) = &self.coordination {
2945 if coord.retry_count > 0 {
2946 if let Some(last_retry) = coord.last_retry_at {
2947 let retry_delay = coord.timeout_state.get_retry_delay();
2948 return now.duration_since(last_retry) >= retry_delay;
2949 }
2950 }
2951 }
2952 false
2953 }
2954
2955 #[allow(dead_code)] pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2958 self.resource_manager.update_stats(
2960 self.active_validations.len(),
2961 self.local_candidates.len(),
2962 self.remote_candidates.len(),
2963 self.candidate_pairs.len(),
2964 );
2965
2966 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2968 self.active_validations.len(),
2969 self.local_candidates.len(),
2970 self.remote_candidates.len(),
2971 self.candidate_pairs.len(),
2972 );
2973
2974 let mut cleaned = 0;
2976
2977 if self.resource_manager.should_cleanup(now) {
2978 cleaned += self.resource_manager.cleanup_expired_resources(
2979 &mut self.active_validations,
2980 &mut self.local_candidates,
2981 &mut self.remote_candidates,
2982 &mut self.candidate_pairs,
2983 &mut self.coordination,
2984 now,
2985 );
2986
2987 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2989 cleaned += self.resource_manager.aggressive_cleanup(
2990 &mut self.active_validations,
2991 &mut self.local_candidates,
2992 &mut self.remote_candidates,
2993 &mut self.candidate_pairs,
2994 now,
2995 );
2996 }
2997 }
2998
2999 cleaned
3000 }
3001
3002 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
3004 self.resource_manager.update_stats(
3006 self.active_validations.len(),
3007 self.local_candidates.len(),
3008 self.remote_candidates.len(),
3009 self.candidate_pairs.len(),
3010 );
3011 let memory_pressure = self.resource_manager.calculate_memory_pressure(
3012 self.active_validations.len(),
3013 self.local_candidates.len(),
3014 self.remote_candidates.len(),
3015 self.candidate_pairs.len(),
3016 );
3017
3018 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
3020 self.resource_manager.stats.allocation_failures += 1;
3021 return true;
3022 }
3023
3024 if self.resource_manager.check_resource_limits(self) {
3026 self.resource_manager.stats.allocation_failures += 1;
3027 return true;
3028 }
3029
3030 false
3031 }
3032
3033 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
3035 let mut next_timeout = None;
3036
3037 if let Some(coord) = &self.coordination {
3039 match coord.state {
3040 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
3041 let timeout_at = coord.round_start + self.coordination_timeout;
3042 next_timeout =
3043 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
3044 }
3045 CoordinationPhase::Preparing => {
3046 next_timeout = Some(
3048 next_timeout
3049 .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
3050 );
3051 }
3052 CoordinationPhase::Punching | CoordinationPhase::Validating => {
3053 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
3055 next_timeout =
3056 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
3057 }
3058 _ => {}
3059 }
3060 }
3061
3062 for (_, validation) in &self.active_validations {
3064 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
3065 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
3066 }
3067
3068 if self.resource_manager.should_cleanup(now) {
3070 let cleanup_at = now + Duration::from_secs(1);
3072 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
3073 }
3074
3075 next_timeout
3076 }
3077
3078 pub(super) fn handle_timeout(
3080 &mut self,
3081 now: Instant,
3082 ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
3083 let mut actions = Vec::new();
3084
3085 if let Some(coord) = &mut self.coordination {
3087 match coord.state {
3088 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
3089 let timeout_at = coord.round_start + self.coordination_timeout;
3090 if now >= timeout_at {
3091 coord.retry_count += 1;
3092 if coord.retry_count >= coord.max_retries {
3093 debug!("Coordination failed after {} retries", coord.retry_count);
3094 coord.state = CoordinationPhase::Failed;
3095 actions.push(TimeoutAction::Failed);
3096 } else {
3097 debug!(
3098 "Coordination timeout, retrying ({}/{})",
3099 coord.retry_count, coord.max_retries
3100 );
3101 coord.state = CoordinationPhase::Requesting;
3102 coord.round_start = now;
3103 actions.push(TimeoutAction::RetryCoordination);
3104 }
3105 }
3106 }
3107 CoordinationPhase::Preparing => {
3108 if now >= coord.punch_start {
3110 debug!("Starting coordinated hole punching");
3111 coord.state = CoordinationPhase::Punching;
3112 actions.push(TimeoutAction::StartValidation);
3113 }
3114 }
3115 CoordinationPhase::Punching | CoordinationPhase::Validating => {
3116 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
3117 if now >= timeout_at {
3118 coord.retry_count += 1;
3119 if coord.retry_count >= coord.max_retries {
3120 debug!("Validation failed after {} retries", coord.retry_count);
3121 coord.state = CoordinationPhase::Failed;
3122 actions.push(TimeoutAction::Failed);
3123 } else {
3124 debug!(
3125 "Validation timeout, retrying ({}/{})",
3126 coord.retry_count, coord.max_retries
3127 );
3128 coord.state = CoordinationPhase::Punching;
3129 actions.push(TimeoutAction::StartValidation);
3130 }
3131 }
3132 }
3133 CoordinationPhase::Succeeded => {
3134 actions.push(TimeoutAction::Complete);
3135 }
3136 CoordinationPhase::Failed => {
3137 actions.push(TimeoutAction::Failed);
3138 }
3139 _ => {}
3140 }
3141 }
3142
3143 let mut expired_validations = Vec::new();
3145 for (addr, validation) in &mut self.active_validations {
3146 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
3147 if now >= timeout_at {
3148 validation.retry_count += 1;
3149 if validation.retry_count >= validation.max_retries {
3150 debug!("Path validation failed for {}: max retries exceeded", addr);
3151 expired_validations.push(*addr);
3152 } else {
3153 debug!(
3154 "Path validation timeout for {}, retrying ({}/{})",
3155 addr, validation.retry_count, validation.max_retries
3156 );
3157 validation.sent_at = now;
3158 validation.last_retry_at = Some(now);
3159 actions.push(TimeoutAction::StartValidation);
3160 }
3161 }
3162 }
3163
3164 for addr in expired_validations {
3166 self.active_validations.remove(&addr);
3167 }
3168
3169 if self.resource_manager.should_cleanup(now) {
3171 self.resource_manager.perform_cleanup(now);
3172 }
3173
3174 self.network_monitor.update_quality_score(now);
3176
3177 if self.coordination.is_none()
3179 && !self.local_candidates.is_empty()
3180 && !self.remote_candidates.is_empty()
3181 {
3182 actions.push(TimeoutAction::RetryDiscovery);
3183 }
3184
3185 Ok(actions)
3186 }
3187
3188 #[allow(dead_code)] pub(super) fn handle_address_observation(
3194 &mut self,
3195 peer_id: [u8; 32],
3196 observed_address: SocketAddr,
3197 connection_id: crate::shared::ConnectionId,
3198 peer_role: NatTraversalRole,
3199 now: Instant,
3200 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3201 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3202 let connection_context = ConnectionContext {
3203 connection_id,
3204 original_destination: observed_address, peer_role,
3206 transport_params: None,
3207 };
3208
3209 bootstrap_coordinator.observe_peer_address(
3211 peer_id,
3212 observed_address,
3213 connection_context,
3214 now,
3215 )?;
3216
3217 let sequence = self.next_sequence;
3219 self.next_sequence =
3220 VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3221
3222 let priority = VarInt::from_u32(100); let add_address_frame =
3224 bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3225
3226 Ok(add_address_frame)
3227 } else {
3228 Ok(None)
3230 }
3231 }
3232
3233 pub(super) fn handle_punch_me_now_frame(
3238 &mut self,
3239 from_peer: [u8; 32],
3240 source_addr: SocketAddr,
3241 frame: &crate::frame::PunchMeNow,
3242 now: Instant,
3243 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3244 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3245 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3246 } else {
3247 Ok(None)
3249 }
3250 }
3251
3252 #[allow(dead_code)] pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3257 self.bootstrap_coordinator
3258 .as_ref()
3259 .and_then(|coord| coord.get_peer_record(peer_id))
3260 .map(|record| record.observed_address)
3261 }
3262
3263 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3265 debug!("Starting candidate discovery for NAT traversal");
3266
3267 if self.local_candidates.is_empty() {
3269 debug!("Local candidates will be populated by discovery manager");
3272 }
3273
3274 Ok(())
3275 }
3276
3277 #[allow(dead_code)] pub(super) fn queue_add_address_frame(
3280 &mut self,
3281 sequence: VarInt,
3282 address: SocketAddr,
3283 priority: u32,
3284 ) -> Result<(), NatTraversalError> {
3285 debug!(
3286 "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3287 sequence, address, priority
3288 );
3289
3290 let candidate = AddressCandidate {
3292 address,
3293 priority,
3294 source: CandidateSource::Local,
3295 discovered_at: Instant::now(),
3296 state: CandidateState::New,
3297 attempt_count: 0,
3298 last_attempt: None,
3299 };
3300
3301 if !self.local_candidates.values().any(|c| c.address == address) {
3303 self.local_candidates.insert(sequence, candidate);
3304 }
3305
3306 Ok(())
3307 }
3308}
3309
3310#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3312#[allow(dead_code)] pub(crate) enum NatTraversalError {
3314 TooManyCandidates,
3316 DuplicateAddress,
3318 UnknownCandidate,
3320 InvalidCandidateState,
3322 NoActiveValidation,
3324 ChallengeMismatch,
3326 NoActiveCoordination,
3328 SecurityValidationFailed,
3330 RateLimitExceeded,
3332 InvalidAddress,
3334 SuspiciousCoordination,
3336 ResourceLimitExceeded,
3338}
3339
3340impl std::fmt::Display for NatTraversalError {
3341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3342 match self {
3343 Self::TooManyCandidates => write!(f, "too many candidates"),
3344 Self::DuplicateAddress => write!(f, "duplicate address"),
3345 Self::UnknownCandidate => write!(f, "unknown candidate"),
3346 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3347 Self::NoActiveValidation => write!(f, "no active validation"),
3348 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3349 Self::NoActiveCoordination => write!(f, "no active coordination"),
3350 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3351 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3352 Self::InvalidAddress => write!(f, "invalid address"),
3353 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3354 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3355 }
3356 }
3357}
3358
3359impl std::error::Error for NatTraversalError {}
3360
3361#[derive(Debug, Clone)]
3363#[allow(dead_code)] pub(crate) struct SecurityStats {
3365 pub total_security_rejections: u32,
3367 pub rate_limit_violations: u32,
3369 pub invalid_address_rejections: u32,
3371 pub suspicious_coordination_attempts: u32,
3373 pub active_validations: usize,
3375 pub cached_address_validations: usize,
3377 pub current_candidate_rate: usize,
3379 pub current_coordination_rate: usize,
3381}
3382
3383#[derive(Debug)]
3388pub(crate) struct BootstrapCoordinator {
3389 peer_registry: HashMap<PeerId, PeerObservationRecord>,
3391 coordination_sessions: HashMap<CoordinationSessionId, CoordinationSession>,
3393 #[allow(dead_code)] pending_coordination: VecDeque<PendingCoordinationRequest>,
3396 #[allow(dead_code)] address_observations: HashMap<SocketAddr, AddressObservation>,
3399 security_validator: SecurityValidationState,
3401 stats: BootstrapStats,
3403 _config: BootstrapConfig,
3405 _last_cleanup: Option<Instant>,
3407}
3408
3409type CoordinationSessionId = u64;
3411
3412type PeerId = [u8; 32];
3414
3415#[derive(Debug, Clone)]
3417pub(crate) struct PeerObservationRecord {
3418 #[allow(dead_code)] peer_id: PeerId,
3421 observed_address: SocketAddr,
3423 #[allow(dead_code)] observed_at: Instant,
3426 #[allow(dead_code)] connection_context: ConnectionContext,
3429 #[allow(dead_code)] can_coordinate: bool,
3432 #[allow(dead_code)] coordination_count: u32,
3435 #[allow(dead_code)] success_rate: f64,
3438}
3439
3440#[derive(Debug, Clone)]
3442pub(crate) struct ConnectionContext {
3443 #[allow(dead_code)] connection_id: ConnectionId,
3446 #[allow(dead_code)] original_destination: SocketAddr,
3449 #[allow(dead_code)] peer_role: NatTraversalRole,
3452 #[allow(dead_code)] transport_params: Option<NatTraversalTransportParams>,
3455}
3456
3457#[derive(Debug, Clone)]
3459struct NatTraversalTransportParams {
3460 #[allow(dead_code)] max_candidates: u32,
3463 #[allow(dead_code)] coordination_timeout: Duration,
3466 #[allow(dead_code)] supports_advanced_features: bool,
3469}
3470
3471#[derive(Debug, Clone)]
3473struct AddressObservation {
3474 #[allow(dead_code)] address: SocketAddr,
3477 #[allow(dead_code)] first_observed: Instant,
3480 #[allow(dead_code)] observation_count: u32,
3483 #[allow(dead_code)] validation_state: AddressValidationResult,
3486 #[allow(dead_code)] associated_peers: Vec<PeerId>,
3489}
3490
3491#[derive(Debug, Clone)]
3493#[allow(dead_code)] pub(crate) struct CoordinationSession {
3495 session_id: CoordinationSessionId,
3497 peer_a: PeerId,
3499 peer_b: PeerId,
3501 current_round: VarInt,
3503 started_at: Instant,
3505 phase: CoordinationPhase,
3507 target_addresses: Vec<(SocketAddr, VarInt)>, sync_state: SynchronizationState,
3511 stats: CoordinationSessionStats,
3513}
3514
3515#[derive(Debug, Clone)]
3517struct SynchronizationState {
3518 peer_a_ready: bool,
3520 peer_b_ready: bool,
3522}
3523
3524#[derive(Debug, Clone, Default)]
3526struct CoordinationSessionStats {
3527 successful_coordinations: u32,
3529}
3530
3531#[derive(Debug, Clone)]
3533struct PendingCoordinationRequest {
3534 _unused: (),
3535}
3536
3537#[derive(Debug, Clone)]
3539pub(crate) struct BootstrapConfig {
3540 _unused: (),
3541}
3542
3543#[derive(Debug, Clone, Default)]
3545pub(crate) struct BootstrapStats {
3546 #[allow(dead_code)] total_observations: u64,
3549 total_coordinations: u64,
3551 successful_coordinations: u64,
3553 #[allow(dead_code)] active_peers: usize,
3556 active_sessions: usize,
3558 security_rejections: u64,
3560}
3561
3562#[derive(Debug, Clone)]
3564#[allow(dead_code)] pub(crate) enum CoordinationSessionEvent {
3566 PhaseChanged {
3568 session_id: CoordinationSessionId,
3569 old_phase: CoordinationPhase,
3570 new_phase: CoordinationPhase,
3571 },
3572 SessionFailed {
3574 session_id: CoordinationSessionId,
3575 peer_a: PeerId,
3576 peer_b: PeerId,
3577 reason: String,
3578 },
3579 StartHolePunching {
3581 session_id: CoordinationSessionId,
3582 peer_a: PeerId,
3583 peer_b: PeerId,
3584 target_addresses: Vec<(SocketAddr, VarInt)>,
3585 },
3586 ReadyForCleanup { session_id: CoordinationSessionId },
3588}
3589
3590#[derive(Debug, Clone, Copy)]
3592#[allow(dead_code)] enum SessionAdvancementEvent {
3594 BothPeersReady,
3596 CoordinationComplete,
3598 PreparationComplete,
3600 PunchingComplete,
3602 ValidationTimeout,
3604 Timeout,
3606 ReadyForCleanup,
3608}
3609
3610#[derive(Debug, Clone, Copy)]
3612#[allow(dead_code)] pub(crate) enum CoordinationRecoveryAction {
3614 NoAction,
3616 RetryWithBackoff,
3618 MarkAsFailed,
3620 Cleanup,
3622}
3623
3624impl BootstrapCoordinator {
3625 pub(crate) fn new(config: BootstrapConfig) -> Self {
3627 Self {
3628 peer_registry: HashMap::new(),
3629 coordination_sessions: HashMap::new(),
3630 pending_coordination: VecDeque::new(),
3631 address_observations: HashMap::new(),
3632 security_validator: SecurityValidationState::new(),
3633 stats: BootstrapStats::default(),
3634 _config: config,
3635 _last_cleanup: None,
3636 }
3637 }
3638
3639 #[allow(dead_code)] pub(crate) fn observe_peer_address(
3645 &mut self,
3646 peer_id: PeerId,
3647 observed_address: SocketAddr,
3648 connection_context: ConnectionContext,
3649 now: Instant,
3650 ) -> Result<(), NatTraversalError> {
3651 match self
3653 .security_validator
3654 .validate_address(observed_address, now)
3655 {
3656 AddressValidationResult::Valid => {}
3657 AddressValidationResult::Invalid => {
3658 self.stats.security_rejections += 1;
3659 return Err(NatTraversalError::InvalidAddress);
3660 }
3661 AddressValidationResult::Suspicious => {
3662 self.stats.security_rejections += 1;
3663 return Err(NatTraversalError::SecurityValidationFailed);
3664 }
3665 }
3666
3667 if self.security_validator.is_candidate_rate_limited(now) {
3669 self.stats.security_rejections += 1;
3670 return Err(NatTraversalError::RateLimitExceeded);
3671 }
3672
3673 let observation = self
3675 .address_observations
3676 .entry(observed_address)
3677 .or_insert_with(|| AddressObservation {
3678 address: observed_address,
3679 first_observed: now,
3680 observation_count: 0,
3681 validation_state: AddressValidationResult::Valid,
3682 associated_peers: Vec::new(),
3683 });
3684
3685 observation.observation_count += 1;
3686 if !observation.associated_peers.contains(&peer_id) {
3687 observation.associated_peers.push(peer_id);
3688 }
3689
3690 let peer_record = PeerObservationRecord {
3692 peer_id,
3693 observed_address,
3694 observed_at: now,
3695 connection_context,
3696 can_coordinate: true, coordination_count: 0,
3698 success_rate: 1.0,
3699 };
3700
3701 self.peer_registry.insert(peer_id, peer_record);
3702 self.stats.total_observations += 1;
3703 self.stats.active_peers = self.peer_registry.len();
3704
3705 debug!(
3706 "Observed peer {:?} at address {} (total observations: {})",
3707 peer_id, observed_address, self.stats.total_observations
3708 );
3709
3710 Ok(())
3711 }
3712
3713 #[allow(dead_code)] pub(crate) fn generate_add_address_frame(
3719 &self,
3720 peer_id: PeerId,
3721 sequence: VarInt,
3722 priority: VarInt,
3723 ) -> Option<crate::frame::AddAddress> {
3724 if let Some(peer_record) = self.peer_registry.get(&peer_id) {
3725 Some(crate::frame::AddAddress {
3726 sequence,
3727 address: peer_record.observed_address,
3728 priority,
3729 })
3730 } else {
3731 None
3732 }
3733 }
3734
3735 pub(crate) fn process_punch_me_now_frame(
3740 &mut self,
3741 from_peer: PeerId,
3742 source_addr: SocketAddr,
3743 frame: &crate::frame::PunchMeNow,
3744 now: Instant,
3745 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3746 if self
3748 .security_validator
3749 .is_adaptive_rate_limited(from_peer, now)
3750 {
3751 self.stats.security_rejections += 1;
3752 debug!(
3753 "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3754 hex::encode(&from_peer[..8])
3755 );
3756 return Err(NatTraversalError::RateLimitExceeded);
3757 }
3758
3759 self.security_validator
3761 .enhanced_address_validation(frame.local_address, source_addr, now)
3762 .map_err(|e| {
3763 self.stats.security_rejections += 1;
3764 debug!(
3765 "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3766 hex::encode(&from_peer[..8]),
3767 e
3768 );
3769 e
3770 })?;
3771
3772 self.security_validator
3774 .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3775 .map_err(|e| {
3776 self.stats.security_rejections += 1;
3777 debug!(
3778 "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3779 hex::encode(&from_peer[..8]),
3780 e
3781 );
3782 e
3783 })?;
3784
3785 if let Some(target_peer_id) = frame.target_peer_id {
3787 if let Some(target_peer) = self.peer_registry.get(&target_peer_id) {
3789 let session_id = self.generate_session_id();
3791
3792 if !self.coordination_sessions.contains_key(&session_id) {
3793 let _network_rtt = self
3795 .estimate_peer_rtt(&from_peer)
3796 .unwrap_or(Duration::from_millis(100));
3797
3798 let session = CoordinationSession {
3799 session_id,
3800 peer_a: from_peer,
3801 peer_b: target_peer_id,
3802 current_round: frame.round,
3803 started_at: now,
3804 phase: CoordinationPhase::Requesting,
3805 target_addresses: vec![(frame.local_address, frame.target_sequence)],
3806 sync_state: SynchronizationState {
3807 peer_a_ready: true, peer_b_ready: false,
3809 },
3810 stats: CoordinationSessionStats::default(),
3811 };
3812
3813 self.coordination_sessions.insert(session_id, session);
3814 self.stats.total_coordinations += 1;
3815 self.stats.active_sessions = self.coordination_sessions.len();
3816 }
3817
3818 let coordination_frame = crate::frame::PunchMeNow {
3820 round: frame.round,
3821 target_sequence: frame.target_sequence,
3822 local_address: target_peer.observed_address,
3823 target_peer_id: Some(from_peer),
3824 };
3825
3826 info!(
3827 "Coordinating hole punch between {:?} and {:?} (round: {})",
3828 from_peer, target_peer_id, frame.round
3829 );
3830
3831 Ok(Some(coordination_frame))
3832 } else {
3833 warn!(
3835 "Target peer {:?} not found for coordination from {:?}",
3836 target_peer_id, from_peer
3837 );
3838 Ok(None)
3839 }
3840 } else {
3841 let session_id = if let Some(session) =
3843 self.find_coordination_session_by_peer(from_peer, frame.round)
3844 {
3845 session.sync_state.peer_b_ready = true;
3846
3847 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3849 session.phase = CoordinationPhase::Punching;
3850 session.stats.successful_coordinations += 1;
3851 Some(session.session_id)
3852 } else {
3853 None
3854 }
3855 } else {
3856 None
3857 };
3858
3859 if let Some(session_id) = session_id {
3861 self.stats.successful_coordinations += 1;
3862 info!(
3863 "Coordination complete for session {} (round: {})",
3864 session_id, frame.round
3865 );
3866 }
3867
3868 Ok(None)
3869 }
3870 }
3871
3872 fn find_coordination_session_by_peer(
3874 &mut self,
3875 peer_id: PeerId,
3876 round: VarInt,
3877 ) -> Option<&mut CoordinationSession> {
3878 self.coordination_sessions.values_mut().find(|session| {
3879 (session.peer_a == peer_id || session.peer_b == peer_id)
3880 && session.current_round == round
3881 })
3882 }
3883
3884 fn generate_session_id(&self) -> CoordinationSessionId {
3886 rand::random()
3887 }
3888
3889 #[allow(dead_code)] pub(crate) fn generate_secure_coordination_round(&self) -> VarInt {
3892 self.security_validator.generate_secure_coordination_round()
3893 }
3894
3895 #[allow(dead_code)] pub(crate) fn validate_coordination_security(
3898 &mut self,
3899 peer_id: PeerId,
3900 source_addr: SocketAddr,
3901 target_addr: SocketAddr,
3902 now: Instant,
3903 ) -> Result<(), NatTraversalError> {
3904 if self
3906 .security_validator
3907 .is_adaptive_rate_limited(peer_id, now)
3908 {
3909 self.stats.security_rejections += 1;
3910 return Err(NatTraversalError::RateLimitExceeded);
3911 }
3912
3913 self.security_validator
3915 .enhanced_address_validation(target_addr, source_addr, now)?;
3916
3917 self.security_validator
3919 .validate_amplification_limits(source_addr, target_addr, now)?;
3920
3921 Ok(())
3922 }
3923
3924 #[allow(dead_code)] pub(crate) fn cleanup_expired_sessions(&mut self, now: Instant) {
3927 let session_timeout = Duration::from_secs(300); let expired_sessions: Vec<CoordinationSessionId> = self
3931 .coordination_sessions
3932 .iter()
3933 .filter(|(_, session)| now.duration_since(session.started_at) > session_timeout)
3934 .map(|(&session_id, _)| session_id)
3935 .collect();
3936
3937 for session_id in expired_sessions {
3939 if let Some(session) = self.coordination_sessions.remove(&session_id) {
3940 debug!(
3941 "Cleaned up expired coordination session {} between {:?} and {:?}",
3942 session_id,
3943 hex::encode(&session.peer_a[..8]),
3944 hex::encode(&session.peer_b[..8])
3945 );
3946 }
3947 }
3948
3949 self.stats.active_sessions = self.coordination_sessions.len();
3951
3952 let observation_timeout = Duration::from_secs(3600); self.peer_registry
3955 .retain(|_, record| now.duration_since(record.observed_at) <= observation_timeout);
3956
3957 self.stats.active_peers = self.peer_registry.len();
3959
3960 self.address_observations.retain(|_, observation| {
3962 now.duration_since(observation.first_observed) <= observation_timeout
3963 });
3964 }
3965
3966 #[allow(dead_code)] pub(crate) fn get_stats(&self) -> &BootstrapStats {
3969 &self.stats
3970 }
3971
3972 #[allow(dead_code)] pub(crate) fn update_peer_coordination_stats(&mut self, peer_id: PeerId, success: bool) {
3975 if let Some(peer_record) = self.peer_registry.get_mut(&peer_id) {
3976 peer_record.coordination_count += 1;
3977
3978 if success {
3979 let alpha = 0.1; peer_record.success_rate = peer_record.success_rate * (1.0 - alpha) + alpha;
3982 } else {
3983 let alpha = 0.1;
3985 peer_record.success_rate = peer_record.success_rate * (1.0 - alpha);
3986 }
3987
3988 if peer_record.success_rate < 0.1 && peer_record.coordination_count > 10 {
3990 peer_record.can_coordinate = false;
3991 warn!(
3992 "Disabled coordination for peer {:?} due to low success rate: {:.2}",
3993 hex::encode(&peer_id[..8]),
3994 peer_record.success_rate
3995 );
3996 }
3997 }
3998 }
3999
4000 #[allow(dead_code)] pub(crate) fn poll_session_state_machine(
4006 &mut self,
4007 now: Instant,
4008 ) -> Vec<CoordinationSessionEvent> {
4009 let mut events = Vec::new();
4010 let mut sessions_to_update = Vec::new();
4011
4012 for (&session_id, session) in &self.coordination_sessions {
4014 if let Some(event) = self.should_advance_session(session, now) {
4015 sessions_to_update.push((session_id, event));
4016 }
4017 }
4018
4019 for (session_id, event) in sessions_to_update {
4021 let session_events =
4022 if let Some(session) = self.coordination_sessions.get_mut(&session_id) {
4023 let peer_a = session.peer_a;
4024 let peer_b = session.peer_b;
4025
4026 match Self::advance_session_state_static(session, event, now) {
4027 Ok(session_events) => session_events,
4028 Err(e) => {
4029 warn!("Failed to advance session {} state: {:?}", session_id, e);
4030 session.phase = CoordinationPhase::Failed;
4032 vec![CoordinationSessionEvent::SessionFailed {
4033 session_id,
4034 peer_a,
4035 peer_b,
4036 reason: format!("State advancement error: {:?}", e),
4037 }]
4038 }
4039 }
4040 } else {
4041 Vec::new()
4042 };
4043
4044 events.extend(session_events);
4045 }
4046
4047 self.cleanup_completed_sessions(now);
4049
4050 events
4051 }
4052
4053 #[allow(dead_code)] fn should_advance_session(
4056 &self,
4057 session: &CoordinationSession,
4058 now: Instant,
4059 ) -> Option<SessionAdvancementEvent> {
4060 let session_age = now.duration_since(session.started_at);
4061
4062 match session.phase {
4063 CoordinationPhase::Requesting => {
4064 if session_age > Duration::from_secs(10) {
4066 Some(SessionAdvancementEvent::Timeout)
4067 } else if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4068 Some(SessionAdvancementEvent::BothPeersReady)
4069 } else {
4070 None
4071 }
4072 }
4073 CoordinationPhase::Coordinating => {
4074 if session_age > Duration::from_millis(500) {
4076 Some(SessionAdvancementEvent::CoordinationComplete)
4077 } else {
4078 None
4079 }
4080 }
4081 CoordinationPhase::Preparing => {
4082 if session_age > Duration::from_secs(1) {
4084 Some(SessionAdvancementEvent::PreparationComplete)
4085 } else {
4086 None
4087 }
4088 }
4089 CoordinationPhase::Punching => {
4090 if session_age > Duration::from_secs(2) {
4092 Some(SessionAdvancementEvent::PunchingComplete)
4093 } else {
4094 None
4095 }
4096 }
4097 CoordinationPhase::Validating => {
4098 if session_age > Duration::from_secs(10) {
4100 Some(SessionAdvancementEvent::ValidationTimeout)
4101 } else {
4102 None
4103 }
4104 }
4105 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4106 if session_age > Duration::from_secs(60) {
4108 Some(SessionAdvancementEvent::ReadyForCleanup)
4109 } else {
4110 None
4111 }
4112 }
4113 CoordinationPhase::Idle => {
4114 Some(SessionAdvancementEvent::Timeout)
4116 }
4117 }
4118 }
4119
4120 #[allow(dead_code)] fn advance_session_state_static(
4123 session: &mut CoordinationSession,
4124 event: SessionAdvancementEvent,
4125 _now: Instant,
4126 ) -> Result<Vec<CoordinationSessionEvent>, NatTraversalError> {
4127 let mut events = Vec::new();
4128 let previous_phase = session.phase;
4129
4130 match (session.phase, event) {
4131 (CoordinationPhase::Requesting, SessionAdvancementEvent::BothPeersReady) => {
4132 session.phase = CoordinationPhase::Coordinating;
4133 debug!(
4134 "Session {} advanced from Requesting to Coordinating",
4135 session.session_id
4136 );
4137 events.push(CoordinationSessionEvent::PhaseChanged {
4138 session_id: session.session_id,
4139 old_phase: previous_phase,
4140 new_phase: session.phase,
4141 });
4142 }
4143 (CoordinationPhase::Requesting, SessionAdvancementEvent::Timeout) => {
4144 session.phase = CoordinationPhase::Failed;
4145 warn!(
4146 "Session {} timed out in Requesting phase",
4147 session.session_id
4148 );
4149 events.push(CoordinationSessionEvent::SessionFailed {
4150 session_id: session.session_id,
4151 peer_a: session.peer_a,
4152 peer_b: session.peer_b,
4153 reason: "Timeout waiting for peer responses".to_string(),
4154 });
4155 }
4156 (CoordinationPhase::Coordinating, SessionAdvancementEvent::CoordinationComplete) => {
4157 session.phase = CoordinationPhase::Preparing;
4158 debug!(
4159 "Session {} advanced from Coordinating to Preparing",
4160 session.session_id
4161 );
4162 events.push(CoordinationSessionEvent::PhaseChanged {
4163 session_id: session.session_id,
4164 old_phase: previous_phase,
4165 new_phase: session.phase,
4166 });
4167 }
4168 (CoordinationPhase::Preparing, SessionAdvancementEvent::PreparationComplete) => {
4169 session.phase = CoordinationPhase::Punching;
4170 debug!(
4171 "Session {} advanced from Preparing to Punching",
4172 session.session_id
4173 );
4174 events.push(CoordinationSessionEvent::PhaseChanged {
4175 session_id: session.session_id,
4176 old_phase: previous_phase,
4177 new_phase: session.phase,
4178 });
4179 events.push(CoordinationSessionEvent::StartHolePunching {
4180 session_id: session.session_id,
4181 peer_a: session.peer_a,
4182 peer_b: session.peer_b,
4183 target_addresses: session.target_addresses.clone(),
4184 });
4185 }
4186 (CoordinationPhase::Punching, SessionAdvancementEvent::PunchingComplete) => {
4187 session.phase = CoordinationPhase::Validating;
4188 debug!(
4189 "Session {} advanced from Punching to Validating",
4190 session.session_id
4191 );
4192 events.push(CoordinationSessionEvent::PhaseChanged {
4193 session_id: session.session_id,
4194 old_phase: previous_phase,
4195 new_phase: session.phase,
4196 });
4197 }
4198 (CoordinationPhase::Validating, SessionAdvancementEvent::ValidationTimeout) => {
4199 session.phase = CoordinationPhase::Failed;
4200 warn!("Session {} validation timed out", session.session_id);
4201 events.push(CoordinationSessionEvent::SessionFailed {
4202 session_id: session.session_id,
4203 peer_a: session.peer_a,
4204 peer_b: session.peer_b,
4205 reason: "Validation timeout".to_string(),
4206 });
4207 }
4208 (phase, SessionAdvancementEvent::ReadyForCleanup) => {
4209 debug!(
4210 "Session {} ready for cleanup in phase {:?}",
4211 session.session_id, phase
4212 );
4213 events.push(CoordinationSessionEvent::ReadyForCleanup {
4214 session_id: session.session_id,
4215 });
4216 }
4217 _ => {
4218 warn!(
4220 "Invalid state transition for session {}: {:?} -> {:?}",
4221 session.session_id, session.phase, event
4222 );
4223 }
4224 }
4225
4226 Ok(events)
4227 }
4228
4229 #[allow(dead_code)] fn cleanup_completed_sessions(&mut self, now: Instant) {
4232 let cleanup_timeout = Duration::from_secs(300); let sessions_to_remove: Vec<CoordinationSessionId> = self
4235 .coordination_sessions
4236 .iter()
4237 .filter(|(_, session)| {
4238 matches!(
4239 session.phase,
4240 CoordinationPhase::Succeeded | CoordinationPhase::Failed
4241 ) && now.duration_since(session.started_at) > cleanup_timeout
4242 })
4243 .map(|(&session_id, _)| session_id)
4244 .collect();
4245
4246 for session_id in sessions_to_remove {
4247 if let Some(session) = self.coordination_sessions.remove(&session_id) {
4248 debug!(
4249 "Cleaned up completed session {} in phase {:?}",
4250 session_id, session.phase
4251 );
4252 }
4253 }
4254
4255 self.stats.active_sessions = self.coordination_sessions.len();
4256 }
4257
4258 #[allow(dead_code)] pub(crate) fn retry_failed_coordination(
4264 &mut self,
4265 session_id: CoordinationSessionId,
4266 now: Instant,
4267 ) -> Result<bool, NatTraversalError> {
4268 let session = self
4269 .coordination_sessions
4270 .get_mut(&session_id)
4271 .ok_or(NatTraversalError::NoActiveCoordination)?;
4272
4273 if !matches!(session.phase, CoordinationPhase::Failed) {
4275 return Ok(false);
4276 }
4277
4278 let base_delay = Duration::from_secs(1);
4280 let max_delay = Duration::from_secs(60);
4281 let retry_count = session.stats.successful_coordinations; let delay = std::cmp::min(
4284 base_delay * 2_u32.pow(retry_count.min(10)), max_delay,
4286 );
4287
4288 let _jitter_factor = 0.1;
4290 let jitter =
4291 Duration::from_millis((rand::random::<u64>() % 100) * delay.as_millis() as u64 / 1000);
4292 let total_delay = delay + jitter;
4293
4294 if now.duration_since(session.started_at) < total_delay {
4296 return Ok(false);
4297 }
4298
4299 const MAX_RETRIES: u32 = 5;
4301 if retry_count >= MAX_RETRIES {
4302 warn!(
4303 "Session {} exceeded maximum retry attempts ({})",
4304 session_id, MAX_RETRIES
4305 );
4306 return Ok(false);
4307 }
4308
4309 session.phase = CoordinationPhase::Requesting;
4311 session.started_at = now;
4312 session.sync_state.peer_a_ready = false;
4313 session.sync_state.peer_b_ready = false;
4314 session.stats.successful_coordinations += 1; info!(
4317 "Retrying coordination session {} (attempt {})",
4318 session_id,
4319 retry_count + 1
4320 );
4321 Ok(true)
4322 }
4323
4324 #[allow(dead_code)] pub(crate) fn handle_coordination_error(
4327 &mut self,
4328 session_id: CoordinationSessionId,
4329 error: NatTraversalError,
4330 _now: Instant,
4331 ) -> CoordinationRecoveryAction {
4332 let session = match self.coordination_sessions.get_mut(&session_id) {
4333 Some(session) => session,
4334 None => return CoordinationRecoveryAction::NoAction,
4335 };
4336
4337 match error {
4338 NatTraversalError::RateLimitExceeded => {
4339 warn!("Rate limit exceeded for session {}, will retry", session_id);
4341 CoordinationRecoveryAction::RetryWithBackoff
4342 }
4343 NatTraversalError::SecurityValidationFailed
4344 | NatTraversalError::SuspiciousCoordination => {
4345 session.phase = CoordinationPhase::Failed;
4347 warn!(
4348 "Security validation failed for session {}, marking as failed",
4349 session_id
4350 );
4351 CoordinationRecoveryAction::MarkAsFailed
4352 }
4353 NatTraversalError::InvalidAddress => {
4354 warn!("Invalid address in session {}, allowing retry", session_id);
4356 CoordinationRecoveryAction::RetryWithBackoff
4357 }
4358 NatTraversalError::NoActiveCoordination => {
4359 warn!(
4361 "No active coordination for session {}, cleaning up",
4362 session_id
4363 );
4364 CoordinationRecoveryAction::Cleanup
4365 }
4366 _ => {
4367 warn!(
4369 "Coordination error for session {}: {:?}, will retry",
4370 session_id, error
4371 );
4372 CoordinationRecoveryAction::RetryWithBackoff
4373 }
4374 }
4375 }
4376
4377 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
4379 if let Some(_peer_record) = self.peer_registry.get(peer_id) {
4382 Some(Duration::from_millis(100))
4384 } else {
4385 None
4386 }
4387 }
4388
4389 #[allow(dead_code)] pub(crate) fn coordinate_hole_punching(
4395 &mut self,
4396 peer_a: PeerId,
4397 peer_b: PeerId,
4398 round: VarInt,
4399 now: Instant,
4400 ) -> Result<CoordinationSessionId, NatTraversalError> {
4401 let peer_a_record = self
4403 .peer_registry
4404 .get(&peer_a)
4405 .ok_or(NatTraversalError::UnknownCandidate)?;
4406 let peer_b_record = self
4407 .peer_registry
4408 .get(&peer_b)
4409 .ok_or(NatTraversalError::UnknownCandidate)?;
4410
4411 if !peer_a_record.can_coordinate || !peer_b_record.can_coordinate {
4412 return Err(NatTraversalError::InvalidCandidateState);
4413 }
4414
4415 let session_id = self.generate_session_id();
4417
4418 let session = CoordinationSession {
4420 session_id,
4421 peer_a,
4422 peer_b,
4423 current_round: round,
4424 started_at: now,
4425 phase: CoordinationPhase::Requesting,
4426 target_addresses: vec![
4427 (peer_a_record.observed_address, VarInt::from_u32(0)),
4428 (peer_b_record.observed_address, VarInt::from_u32(1)),
4429 ],
4430 sync_state: SynchronizationState {
4431 peer_a_ready: false,
4432 peer_b_ready: false,
4433 },
4434 stats: CoordinationSessionStats::default(),
4435 };
4436
4437 self.coordination_sessions.insert(session_id, session);
4438 self.stats.total_coordinations += 1;
4439 self.stats.active_sessions = self.coordination_sessions.len();
4440
4441 info!(
4442 "Started coordination session {} between peers {:?} and {:?} (round: {})",
4443 session_id,
4444 hex::encode(&peer_a[..8]),
4445 hex::encode(&peer_b[..8]),
4446 round
4447 );
4448
4449 Ok(session_id)
4450 }
4451
4452 #[allow(dead_code)] pub(crate) fn relay_coordination_frame(
4458 &mut self,
4459 session_id: CoordinationSessionId,
4460 from_peer: PeerId,
4461 frame: &crate::frame::PunchMeNow,
4462 _now: Instant,
4463 ) -> Result<Option<(PeerId, crate::frame::PunchMeNow)>, NatTraversalError> {
4464 let session = self
4465 .coordination_sessions
4466 .get_mut(&session_id)
4467 .ok_or(NatTraversalError::NoActiveCoordination)?;
4468
4469 if session.peer_a != from_peer && session.peer_b != from_peer {
4471 return Err(NatTraversalError::SuspiciousCoordination);
4472 }
4473
4474 let target_peer = if session.peer_a == from_peer {
4476 session.peer_b
4477 } else {
4478 session.peer_a
4479 };
4480
4481 let target_record = self
4483 .peer_registry
4484 .get(&target_peer)
4485 .ok_or(NatTraversalError::UnknownCandidate)?;
4486
4487 if session.peer_a == from_peer {
4489 session.sync_state.peer_a_ready = true;
4490 } else {
4491 session.sync_state.peer_b_ready = true;
4492 }
4493
4494 let relay_frame = crate::frame::PunchMeNow {
4496 round: frame.round,
4497 target_sequence: frame.target_sequence,
4498 local_address: target_record.observed_address,
4499 target_peer_id: Some(from_peer),
4500 };
4501
4502 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4504 session.phase = CoordinationPhase::Coordinating;
4505 info!(
4506 "Coordination phase complete for session {} - both peers ready",
4507 session_id
4508 );
4509 }
4510
4511 debug!(
4512 "Relaying coordination frame from {:?} to {:?} in session {}",
4513 hex::encode(&from_peer[..8]),
4514 hex::encode(&target_peer[..8]),
4515 session_id
4516 );
4517
4518 Ok(Some((target_peer, relay_frame)))
4519 }
4520
4521 #[allow(dead_code)] pub(crate) fn advance_coordination_round(
4527 &mut self,
4528 session_id: CoordinationSessionId,
4529 now: Instant,
4530 ) -> Result<CoordinationPhase, NatTraversalError> {
4531 let session = self
4532 .coordination_sessions
4533 .get_mut(&session_id)
4534 .ok_or(NatTraversalError::NoActiveCoordination)?;
4535
4536 let previous_phase = session.phase;
4537
4538 match session.phase {
4540 CoordinationPhase::Requesting => {
4541 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4543 session.phase = CoordinationPhase::Coordinating;
4544 debug!("Session {} advanced to Coordinating phase", session_id);
4545 }
4546 }
4547 CoordinationPhase::Coordinating => {
4548 let coordination_delay = Duration::from_millis(200); let punch_time = now + coordination_delay;
4551
4552 session.phase = CoordinationPhase::Preparing;
4553 debug!(
4554 "Session {} advanced to Preparing phase, punch time: {:?}",
4555 session_id, punch_time
4556 );
4557 }
4558 CoordinationPhase::Preparing => {
4559 session.phase = CoordinationPhase::Punching;
4561 debug!("Session {} advanced to Punching phase", session_id);
4562 }
4563 CoordinationPhase::Punching => {
4564 session.phase = CoordinationPhase::Validating;
4566 debug!("Session {} advanced to Validating phase", session_id);
4567 }
4568 CoordinationPhase::Validating => {
4569 let validation_timeout = Duration::from_secs(5);
4571 if now.duration_since(session.started_at) > validation_timeout {
4572 session.phase = CoordinationPhase::Failed;
4573 debug!("Session {} timed out in validation", session_id);
4574 }
4575 }
4576 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4577 }
4579 CoordinationPhase::Idle => {
4580 session.phase = CoordinationPhase::Requesting;
4582 }
4583 }
4584
4585 if session.phase != previous_phase {
4587 match session.phase {
4588 CoordinationPhase::Succeeded => {
4589 session.stats.successful_coordinations += 1;
4590 self.stats.successful_coordinations += 1;
4591 }
4592 CoordinationPhase::Failed => {
4593 }
4595 _ => {}
4596 }
4597 }
4598
4599 Ok(session.phase)
4600 }
4601
4602 #[allow(dead_code)] pub(crate) fn get_coordination_session(
4605 &self,
4606 session_id: CoordinationSessionId,
4607 ) -> Option<&CoordinationSession> {
4608 self.coordination_sessions.get(&session_id)
4609 }
4610
4611 #[allow(dead_code)] pub(crate) fn get_coordination_session_mut(
4614 &mut self,
4615 session_id: CoordinationSessionId,
4616 ) -> Option<&mut CoordinationSession> {
4617 self.coordination_sessions.get_mut(&session_id)
4618 }
4619
4620 #[allow(dead_code)] pub(crate) fn mark_coordination_success(
4623 &mut self,
4624 session_id: CoordinationSessionId,
4625 _now: Instant,
4626 ) -> Result<(), NatTraversalError> {
4627 let session = self
4628 .coordination_sessions
4629 .get_mut(&session_id)
4630 .ok_or(NatTraversalError::NoActiveCoordination)?;
4631
4632 session.phase = CoordinationPhase::Succeeded;
4633 session.stats.successful_coordinations += 1;
4634 self.stats.successful_coordinations += 1;
4635
4636 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4638 peer_a_record.coordination_count += 1;
4639 peer_a_record.success_rate =
4640 (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64 + 1.0)
4641 / peer_a_record.coordination_count as f64;
4642 }
4643
4644 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4645 peer_b_record.coordination_count += 1;
4646 peer_b_record.success_rate =
4647 (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64 + 1.0)
4648 / peer_b_record.coordination_count as f64;
4649 }
4650
4651 info!("Coordination session {} marked as successful", session_id);
4652 Ok(())
4653 }
4654
4655 #[allow(dead_code)] pub(crate) fn mark_coordination_failure(
4658 &mut self,
4659 session_id: CoordinationSessionId,
4660 reason: &str,
4661 _now: Instant,
4662 ) -> Result<(), NatTraversalError> {
4663 let session = self
4664 .coordination_sessions
4665 .get_mut(&session_id)
4666 .ok_or(NatTraversalError::NoActiveCoordination)?;
4667
4668 session.phase = CoordinationPhase::Failed;
4669
4670 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4672 peer_a_record.coordination_count += 1;
4673 peer_a_record.success_rate = (peer_a_record.success_rate
4674 * (peer_a_record.coordination_count - 1) as f64)
4675 / peer_a_record.coordination_count as f64;
4676 }
4677
4678 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4679 peer_b_record.coordination_count += 1;
4680 peer_b_record.success_rate = (peer_b_record.success_rate
4681 * (peer_b_record.coordination_count - 1) as f64)
4682 / peer_b_record.coordination_count as f64;
4683 }
4684
4685 warn!("Coordination session {} failed: {}", session_id, reason);
4686 Ok(())
4687 }
4688
4689 pub(crate) fn get_peer_record(&self, peer_id: PeerId) -> Option<&PeerObservationRecord> {
4691 self.peer_registry.get(&peer_id)
4692 }
4693}
4694
4695impl Default for BootstrapConfig {
4696 fn default() -> Self {
4697 Self { _unused: () }
4698 }
4699}
4700
4701#[derive(Debug)]
4707#[allow(dead_code)] pub(super) struct MultiDestinationTransmitter {
4709 active_targets: Vec<MultiDestPunchTarget>,
4711 stats: MultiDestTransmissionStats,
4713 max_targets: usize,
4715 rate_limiter: TransmissionRateLimiter,
4717 target_selector: AdaptiveTargetSelector,
4719 performance_monitor: TransmissionPerformanceMonitor,
4721}
4722
4723#[derive(Debug, Default, Clone)]
4725pub(super) struct MultiDestTransmissionStats {
4726 _unused: (),
4727}
4728
4729#[derive(Debug)]
4731struct TransmissionRateLimiter {
4732 _unused: (),
4733}
4734
4735#[derive(Debug)]
4737struct AdaptiveTargetSelector {
4738 _unused: (),
4739}
4740
4741#[derive(Debug)]
4743struct TransmissionPerformanceMonitor {
4744 _unused: (),
4745}
4746
4747impl MultiDestinationTransmitter {
4748 pub(super) fn new() -> Self {
4750 Self {
4751 active_targets: Vec::new(),
4752 stats: MultiDestTransmissionStats::default(),
4753 max_targets: 8, rate_limiter: TransmissionRateLimiter::new(100, 50), target_selector: AdaptiveTargetSelector::new(),
4756 performance_monitor: TransmissionPerformanceMonitor::new(),
4757 }
4758 }
4759}
4760
4761impl TransmissionRateLimiter {
4762 fn new(_max_pps: u64, _burst_size: u64) -> Self {
4763 Self { _unused: () }
4764 }
4765}
4766
4767impl AdaptiveTargetSelector {
4768 fn new() -> Self {
4769 Self { _unused: () }
4770 }
4771}
4772
4773impl TransmissionPerformanceMonitor {
4774 fn new() -> Self {
4775 Self { _unused: () }
4776 }
4777}
4778
4779#[cfg(test)]
4785mod tests {
4786 use super::*;
4787
4788 fn create_test_state(role: NatTraversalRole) -> NatTraversalState {
4789 NatTraversalState::new(
4790 role,
4791 10, Duration::from_secs(30) )
4794 }
4795
4796 #[test]
4797 fn test_add_quic_discovered_address() {
4798 let mut state = create_test_state(NatTraversalRole::Client);
4800 let now = Instant::now();
4801
4802 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
4804 let seq = state.add_local_candidate(
4805 discovered_addr,
4806 CandidateSource::Observed { by_node: None },
4807 now
4808 );
4809
4810 assert_eq!(state.local_candidates.len(), 1);
4812 let candidate = state.local_candidates.get(&seq).unwrap();
4813 assert_eq!(candidate.address, discovered_addr);
4814 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4815 assert_eq!(candidate.state, CandidateState::New);
4816
4817 assert!(candidate.priority > 0);
4819 }
4820
4821 #[test]
4822 fn test_add_multiple_quic_discovered_addresses() {
4823 let mut state = create_test_state(NatTraversalRole::Client);
4825 let now = Instant::now();
4826
4827 let addrs = vec![
4828 SocketAddr::from(([1, 2, 3, 4], 5678)),
4829 SocketAddr::from(([5, 6, 7, 8], 9012)),
4830 SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
4831 ];
4832
4833 let mut sequences = Vec::new();
4834 for addr in &addrs {
4835 let seq = state.add_local_candidate(
4836 *addr,
4837 CandidateSource::Observed { by_node: None },
4838 now
4839 );
4840 sequences.push(seq);
4841 }
4842
4843 assert_eq!(state.local_candidates.len(), 3);
4845
4846 for (seq, addr) in sequences.iter().zip(&addrs) {
4848 let candidate = state.local_candidates.get(seq).unwrap();
4849 assert_eq!(candidate.address, *addr);
4850 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4851 }
4852 }
4853
4854 #[test]
4855 fn test_quic_discovered_addresses_in_local_candidates() {
4856 let mut state = create_test_state(NatTraversalRole::Client);
4858 let now = Instant::now();
4859
4860 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4862 let seq = state.add_local_candidate(
4863 addr,
4864 CandidateSource::Observed { by_node: None },
4865 now
4866 );
4867
4868 assert!(state.local_candidates.contains_key(&seq));
4870 let candidate = state.local_candidates.get(&seq).unwrap();
4871 assert_eq!(candidate.address, addr);
4872
4873 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4875 }
4876
4877 #[test]
4878 fn test_quic_discovered_addresses_included_in_hole_punching() {
4879 let mut state = create_test_state(NatTraversalRole::Client);
4881 let now = Instant::now();
4882
4883 let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4885 state.add_local_candidate(
4886 local_addr,
4887 CandidateSource::Observed { by_node: None },
4888 now
4889 );
4890
4891 let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
4893 let priority = VarInt::from_u32(100);
4894 state.add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
4895 .expect("add remote candidate should succeed");
4896
4897 state.generate_candidate_pairs(now);
4899
4900 assert_eq!(state.candidate_pairs.len(), 1);
4902 let pair = &state.candidate_pairs[0];
4903 assert_eq!(pair.local_addr, local_addr);
4904 assert_eq!(pair.remote_addr, remote_addr);
4905 }
4906
4907 #[test]
4908 fn test_prioritize_quic_discovered_over_predicted() {
4909 let mut state = create_test_state(NatTraversalRole::Client);
4911 let now = Instant::now();
4912
4913 let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
4915 let predicted_seq = state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
4916
4917 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
4919 let discovered_seq = state.add_local_candidate(
4920 discovered_addr,
4921 CandidateSource::Observed { by_node: None },
4922 now
4923 );
4924
4925 let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
4927 let discovered_priority = state.local_candidates.get(&discovered_seq).unwrap().priority;
4928
4929 assert!(discovered_priority >= predicted_priority);
4932 }
4933
4934 #[test]
4935 fn test_integration_with_nat_traversal_flow() {
4936 let mut state = create_test_state(NatTraversalRole::Client);
4938 let now = Instant::now();
4939
4940 let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
4942 state.add_local_candidate(local_addr, CandidateSource::Local, now);
4943
4944 let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
4945 state.add_local_candidate(
4946 discovered_addr,
4947 CandidateSource::Observed { by_node: None },
4948 now
4949 );
4950
4951 let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
4953 let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
4954 let priority = VarInt::from_u32(100);
4955 state.add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
4956 .expect("add remote candidate should succeed");
4957 state.add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
4958 .expect("add remote candidate should succeed");
4959
4960 state.generate_candidate_pairs(now);
4962
4963 assert_eq!(state.candidate_pairs.len(), 4);
4965
4966 let discovered_pairs: Vec<_> = state.candidate_pairs.iter()
4968 .filter(|p| p.local_addr == discovered_addr)
4969 .collect();
4970 assert_eq!(discovered_pairs.len(), 2);
4971 }
4972}