1use std::{
2 collections::{HashMap, VecDeque},
3 net::{IpAddr, SocketAddr, Ipv4Addr, Ipv6Addr},
4 time::Duration,
5};
6
7use tracing::{trace, debug, warn, info};
8use crate::shared::ConnectionId;
9
10use crate::{
11 Instant, VarInt,
12};
13
14#[derive(Debug)]
19pub(super) struct NatTraversalState {
20 pub(super) role: NatTraversalRole,
22 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
24 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
26 pub(super) candidate_pairs: Vec<CandidatePair>,
28 pub(super) pair_index: HashMap<SocketAddr, usize>,
30 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
32 pub(super) coordination: Option<CoordinationState>,
34 pub(super) next_sequence: VarInt,
36 pub(super) max_candidates: u32,
38 pub(super) coordination_timeout: Duration,
40 pub(super) stats: NatTraversalStats,
42 pub(super) security_state: SecurityValidationState,
44 pub(super) network_monitor: NetworkConditionMonitor,
46 pub(super) resource_manager: ResourceCleanupCoordinator,
48 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
50 #[allow(dead_code)] pub(super) multi_dest_transmitter: MultiDestinationTransmitter,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum NatTraversalRole {
58 Client,
60 Server { can_relay: bool },
62 Bootstrap,
64}
65
66#[derive(Debug, Clone)]
68pub(super) struct AddressCandidate {
69 pub(super) address: SocketAddr,
71 pub(super) priority: u32,
73 pub(super) source: CandidateSource,
75 pub(super) discovered_at: Instant,
77 pub(super) state: CandidateState,
79 pub(super) attempt_count: u32,
81 pub(super) last_attempt: Option<Instant>,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum CandidateSource {
88 Local,
90 Observed { by_node: Option<VarInt> },
92 Peer,
94 Predicted,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum CandidateState {
101 New,
103 Validating,
105 Valid,
107 Failed,
109 Removed,
111}
112
113#[derive(Debug)]
115pub(super) struct PathValidationState {
116 pub(super) challenge: u64,
118 pub(super) sent_at: Instant,
120 pub(super) retry_count: u32,
122 pub(super) max_retries: u32,
124 #[allow(dead_code)] pub(super) coordination_round: Option<VarInt>,
127 pub(super) timeout_state: AdaptiveTimeoutState,
129 pub(super) last_retry_at: Option<Instant>,
131}
132
133#[derive(Debug)]
135pub(super) struct CoordinationState {
136 pub(super) round: VarInt,
138 pub(super) punch_targets: Vec<PunchTarget>,
140 pub(super) round_start: Instant,
142 pub(super) punch_start: Instant,
144 #[allow(dead_code)] pub(super) round_duration: Duration,
147 pub(super) state: CoordinationPhase,
149 pub(super) punch_request_sent: bool,
151 pub(super) peer_punch_received: bool,
153 pub(super) retry_count: u32,
155 pub(super) max_retries: u32,
157 pub(super) timeout_state: AdaptiveTimeoutState,
159 pub(super) last_retry_at: Option<Instant>,
161}
162
163#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165#[allow(dead_code)] pub(crate) enum CoordinationPhase {
167 Idle,
169 Requesting,
171 Coordinating,
173 Preparing,
175 Punching,
177 Validating,
179 Succeeded,
181 Failed,
183}
184
185#[derive(Debug, Clone)]
187pub(super) struct PunchTarget {
188 pub(super) remote_addr: SocketAddr,
190 pub(super) remote_sequence: VarInt,
192 pub(super) challenge: u64,
194}
195
196#[derive(Debug, Clone, PartialEq, Eq)]
198pub(super) enum TimeoutAction {
199 RetryDiscovery,
201 RetryCoordination,
203 StartValidation,
205 Complete,
207 Failed,
209}
210
211#[derive(Debug, Clone)]
213#[allow(dead_code)] pub(super) struct MultiDestPunchTarget {
215 pub destination: SocketAddr,
217 pub local_addr: SocketAddr,
219 pub pair_type: PairType,
221 pub priority: u32,
223 pub created_at: Instant,
225}
226
227#[derive(Debug, Clone)]
229pub(super) struct CandidatePair {
230 pub(super) remote_sequence: VarInt,
232 pub(super) local_addr: SocketAddr,
234 pub(super) remote_addr: SocketAddr,
236 pub(super) priority: u64,
238 pub(super) state: PairState,
240 pub(super) pair_type: PairType,
242 pub(super) created_at: Instant,
244 #[allow(dead_code)] pub(super) last_check: Option<Instant>,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub(super) enum PairState {
252 Waiting,
254 Succeeded,
256 #[allow(dead_code)] Failed,
259 Frozen,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
265pub(super) enum PairType {
266 HostToHost,
268 HostToServerReflexive,
270 ServerReflexiveToHost,
272 ServerReflexiveToServerReflexive,
274 PeerReflexive,
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280pub(super) enum CandidateType {
281 Host,
283 ServerReflexive,
285 PeerReflexive,
287}
288
289fn calculate_candidate_priority(
292 candidate_type: CandidateType,
293 local_preference: u16,
294 component_id: u8,
295) -> u32 {
296 let type_preference = match candidate_type {
297 CandidateType::Host => 126,
298 CandidateType::PeerReflexive => 110,
299 CandidateType::ServerReflexive => 100,
300 };
301
302 (1u32 << 24) * type_preference
304 + (1u32 << 8) * local_preference as u32
305 + component_id as u32
306}
307
308fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
311 let g = local_priority as u64;
312 let d = remote_priority as u64;
313
314 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
316}
317
318fn classify_candidate_type(source: CandidateSource) -> CandidateType {
320 match source {
321 CandidateSource::Local => CandidateType::Host,
322 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
323 CandidateSource::Peer => CandidateType::PeerReflexive,
324 CandidateSource::Predicted => CandidateType::ServerReflexive, }
326}
327
328fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
330 match (local_type, remote_type) {
331 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
332 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
333 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
334 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => PairType::ServerReflexiveToServerReflexive,
335 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => PairType::PeerReflexive,
336 }
337}
338
339fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
341 match (local.address, remote.address) {
343 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
344 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
345 _ => false, }
347}
348
349#[derive(Debug, Default, Clone)]
351pub(crate) struct NatTraversalStats {
352 pub(super) remote_candidates_received: u32,
354 pub(super) local_candidates_sent: u32,
356 pub(super) validations_succeeded: u32,
358 #[allow(dead_code)] pub(super) validations_failed: u32,
361 pub(super) coordination_rounds: u32,
363 #[allow(dead_code)] pub(super) successful_coordinations: u32,
366 #[allow(dead_code)] pub(super) failed_coordinations: u32,
369 #[allow(dead_code)] pub(super) timed_out_coordinations: u32,
372 pub(super) coordination_failures: u32,
374 pub(super) direct_connections: u32,
376 pub(super) security_rejections: u32,
378 pub(super) rate_limit_violations: u32,
380 pub(super) invalid_address_rejections: u32,
382 pub(super) suspicious_coordination_attempts: u32,
384}
385
386#[derive(Debug)]
388pub(super) struct SecurityValidationState {
389 candidate_rate_tracker: VecDeque<Instant>,
391 max_candidates_per_window: u32,
393 rate_window: Duration,
395 coordination_requests: VecDeque<CoordinationRequest>,
397 max_coordination_per_window: u32,
399 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
401 #[allow(dead_code)] validation_cache_timeout: Duration,
404}
405
406#[derive(Debug, Clone)]
408struct CoordinationRequest {
409 timestamp: Instant,
411}
412
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
416enum AddressValidationResult {
417 Valid,
419 Invalid,
421 Suspicious,
423}
424
425#[derive(Debug, Clone)]
427pub(super) struct AdaptiveTimeoutState {
428 current_timeout: Duration,
430 min_timeout: Duration,
432 max_timeout: Duration,
434 base_timeout: Duration,
436 backoff_multiplier: f64,
438 max_backoff_multiplier: f64,
440 jitter_factor: f64,
442 srtt: Option<Duration>,
444 rttvar: Option<Duration>,
446 last_rtt: Option<Duration>,
448 consecutive_timeouts: u32,
450 successful_responses: u32,
452}
453
454#[derive(Debug)]
456pub(super) struct NetworkConditionMonitor {
457 rtt_samples: VecDeque<Duration>,
459 max_samples: usize,
461 packet_loss_rate: f64,
463 #[allow(dead_code)] congestion_window: u32,
466 quality_score: f64,
468 last_quality_update: Instant,
470 quality_update_interval: Duration,
472 timeout_stats: TimeoutStatistics,
474}
475
476#[derive(Debug, Default)]
478struct TimeoutStatistics {
479 total_timeouts: u64,
481 total_responses: u64,
483 avg_response_time: Duration,
485 timeout_rate: f64,
487 last_update: Option<Instant>,
489}
490
491impl SecurityValidationState {
492 fn new() -> Self {
494 Self {
495 candidate_rate_tracker: VecDeque::new(),
496 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
498 coordination_requests: VecDeque::new(),
499 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
501 validation_cache_timeout: Duration::from_secs(300), }
503 }
504
505 #[allow(dead_code)] fn new_with_limits(
508 max_candidates_per_window: u32,
509 max_coordination_per_window: u32,
510 rate_window: Duration,
511 ) -> Self {
512 Self {
513 candidate_rate_tracker: VecDeque::new(),
514 max_candidates_per_window,
515 rate_window,
516 coordination_requests: VecDeque::new(),
517 max_coordination_per_window,
518 address_validation_cache: HashMap::new(),
519 validation_cache_timeout: Duration::from_secs(300),
520 }
521 }
522
523 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
528 self.cleanup_rate_tracker(now);
530 self.cleanup_coordination_tracker(now);
531
532 let _current_candidate_rate = self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
534 let _current_coordination_rate = self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
535
536 let peer_reputation = self.calculate_peer_reputation(peer_id);
538 let adaptive_candidate_limit = (self.max_candidates_per_window as f64 * peer_reputation) as u32;
539 let adaptive_coordination_limit = (self.max_coordination_per_window as f64 * peer_reputation) as u32;
540
541 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
543 debug!("Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
544 hex::encode(&peer_id[..8]), self.candidate_rate_tracker.len(), adaptive_candidate_limit);
545 return true;
546 }
547
548 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
549 debug!("Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
550 hex::encode(&peer_id[..8]), self.coordination_requests.len(), adaptive_coordination_limit);
551 return true;
552 }
553
554 false
555 }
556
557 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
562 1.0
572 }
573
574 fn validate_amplification_limits(
579 &mut self,
580 source_addr: SocketAddr,
581 target_addr: SocketAddr,
582 now: Instant,
583 ) -> Result<(), NatTraversalError> {
584 let amplification_key = (source_addr, target_addr);
586
587 if self.is_amplification_suspicious(amplification_key, now) {
596 warn!("Potential amplification attack detected: {} -> {}", source_addr, target_addr);
597 return Err(NatTraversalError::SuspiciousCoordination);
598 }
599
600 Ok(())
601 }
602
603 fn is_amplification_suspicious(&self, _amplification_key: (SocketAddr, SocketAddr), _now: Instant) -> bool {
605 false
615 }
616
617 #[allow(dead_code)] fn generate_secure_coordination_round(&self) -> VarInt {
623 let secure_random: u64 = rand::random();
625
626 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
630 }
631
632 fn enhanced_address_validation(
640 &mut self,
641 addr: SocketAddr,
642 source_addr: SocketAddr,
643 now: Instant,
644 ) -> Result<AddressValidationResult, NatTraversalError> {
645 let basic_result = self.validate_address(addr, now);
647
648 match basic_result {
649 AddressValidationResult::Invalid => {
650 return Err(NatTraversalError::InvalidAddress);
651 }
652 AddressValidationResult::Suspicious => {
653 return Err(NatTraversalError::SuspiciousCoordination);
654 }
655 AddressValidationResult::Valid => {
656 }
658 }
659
660 self.validate_amplification_limits(source_addr, addr, now)?;
662
663 if self.is_address_in_suspicious_range(addr) {
665 warn!("Address in suspicious range detected: {}", addr);
666 return Err(NatTraversalError::SuspiciousCoordination);
667 }
668
669 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
670 warn!("Suspicious coordination pattern detected: {} -> {}", source_addr, addr);
671 return Err(NatTraversalError::SuspiciousCoordination);
672 }
673
674 Ok(AddressValidationResult::Valid)
675 }
676
677 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
679 match addr.ip() {
680 IpAddr::V4(ipv4) => {
681 let octets = ipv4.octets();
683
684 if octets[0] == 0 || octets[0] == 127 {
686 return true;
687 }
688
689 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
691 return true;
692 }
693 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
694 return true;
695 }
696 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
697 return true;
698 }
699
700 false
701 }
702 IpAddr::V6(ipv6) => {
703 if ipv6.is_loopback() || ipv6.is_unspecified() {
705 return true;
706 }
707
708 let segments = ipv6.segments();
710 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
711 return true;
712 }
713
714 false
715 }
716 }
717 }
718
719 fn is_coordination_pattern_suspicious(
721 &self,
722 _source_addr: SocketAddr,
723 _target_addr: SocketAddr,
724 _now: Instant,
725 ) -> bool {
726 false
736 }
737
738 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
740 self.cleanup_rate_tracker(now);
742
743 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
745 return true;
746 }
747
748 self.candidate_rate_tracker.push_back(now);
750 false
751 }
752
753 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
755 self.cleanup_coordination_tracker(now);
757
758 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
760 return true;
761 }
762
763 let request = CoordinationRequest {
765 timestamp: now,
766 };
767 self.coordination_requests.push_back(request);
768 false
769 }
770
771 fn cleanup_rate_tracker(&mut self, now: Instant) {
773 let cutoff = now - self.rate_window;
774 while let Some(&front_time) = self.candidate_rate_tracker.front() {
775 if front_time < cutoff {
776 self.candidate_rate_tracker.pop_front();
777 } else {
778 break;
779 }
780 }
781 }
782
783 fn cleanup_coordination_tracker(&mut self, now: Instant) {
785 let cutoff = now - self.rate_window;
786 while let Some(front_request) = self.coordination_requests.front() {
787 if front_request.timestamp < cutoff {
788 self.coordination_requests.pop_front();
789 } else {
790 break;
791 }
792 }
793 }
794
795 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
797 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
799 return cached_result;
800 }
801
802 let result = self.perform_address_validation(addr);
803
804 self.address_validation_cache.insert(addr, result);
806
807 if self.address_validation_cache.len() > 1000 {
809 self.cleanup_address_cache(now);
810 }
811
812 result
813 }
814
815 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
817 match addr.ip() {
818 IpAddr::V4(ipv4) => {
819 if ipv4.is_unspecified() || ipv4.is_broadcast() {
821 return AddressValidationResult::Invalid;
822 }
823
824 if ipv4.is_multicast() || ipv4.is_documentation() {
826 return AddressValidationResult::Suspicious;
827 }
828
829 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
831 return AddressValidationResult::Invalid;
832 }
833
834 if self.is_suspicious_ipv4(ipv4) {
836 return AddressValidationResult::Suspicious;
837 }
838 }
839 IpAddr::V6(ipv6) => {
840 if ipv6.is_unspecified() || ipv6.is_multicast() {
842 return AddressValidationResult::Invalid;
843 }
844
845 if self.is_suspicious_ipv6(ipv6) {
847 return AddressValidationResult::Suspicious;
848 }
849 }
850 }
851
852 if addr.port() == 0 || addr.port() < 1024 {
854 return AddressValidationResult::Suspicious;
855 }
856
857 AddressValidationResult::Valid
858 }
859
860 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
862 let octets = ipv4.octets();
863
864 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
867 return true;
868 }
869
870 false
873 }
874
875 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
877 let segments = ipv6.segments();
878
879 if segments.iter().all(|&s| s == segments[0]) {
881 return true;
882 }
883
884 false
885 }
886
887 fn cleanup_address_cache(&mut self, _now: Instant) {
889 if self.address_validation_cache.len() > 500 {
892 let keys_to_remove: Vec<_> = self.address_validation_cache
893 .keys()
894 .take(self.address_validation_cache.len() / 2)
895 .copied()
896 .collect();
897
898 for key in keys_to_remove {
899 self.address_validation_cache.remove(&key);
900 }
901 }
902 }
903
904 fn validate_punch_me_now_frame(
912 &mut self,
913 frame: &crate::frame::PunchMeNow,
914 source_addr: SocketAddr,
915 peer_id: [u8; 32],
916 now: Instant
917 ) -> Result<(), NatTraversalError> {
918 if self.is_coordination_rate_limited(now) {
920 debug!("PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}", hex::encode(&peer_id[..8]));
921 return Err(NatTraversalError::RateLimitExceeded);
922 }
923
924 let addr_validation = self.validate_address(frame.local_address, now);
926 match addr_validation {
927 AddressValidationResult::Invalid => {
928 debug!("PUNCH_ME_NOW frame rejected: invalid local_address {:?} from peer {:?}",
929 frame.local_address, hex::encode(&peer_id[..8]));
930 return Err(NatTraversalError::InvalidAddress);
931 }
932 AddressValidationResult::Suspicious => {
933 debug!("PUNCH_ME_NOW frame rejected: suspicious local_address {:?} from peer {:?}",
934 frame.local_address, hex::encode(&peer_id[..8]));
935 return Err(NatTraversalError::SuspiciousCoordination);
936 }
937 AddressValidationResult::Valid => {
938 }
940 }
941
942 if !self.validate_address_consistency(frame.local_address, source_addr) {
945 debug!("PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
946 frame.local_address, source_addr);
947 return Err(NatTraversalError::SuspiciousCoordination);
948 }
949
950 if !self.validate_coordination_parameters(frame) {
952 debug!("PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
953 hex::encode(&peer_id[..8]));
954 return Err(NatTraversalError::SuspiciousCoordination);
955 }
956
957 if let Some(target_peer_id) = frame.target_peer_id {
959 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
960 debug!("PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
961 hex::encode(&peer_id[..8]), hex::encode(&target_peer_id[..8]));
962 return Err(NatTraversalError::SuspiciousCoordination);
963 }
964 }
965
966 if !self.validate_resource_limits(frame) {
968 debug!("PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
969 hex::encode(&peer_id[..8]));
970 return Err(NatTraversalError::ResourceLimitExceeded);
971 }
972
973 debug!("PUNCH_ME_NOW frame validation passed for peer {:?}", hex::encode(&peer_id[..8]));
974 Ok(())
975 }
976
977 fn validate_address_consistency(&self, claimed_addr: SocketAddr, observed_addr: SocketAddr) -> bool {
982 match (claimed_addr.ip(), observed_addr.ip()) {
987 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
988 if claimed_ip == observed_ip {
990 return true;
991 }
992
993 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
995 return true;
996 }
997
998 !claimed_ip.is_private() && !observed_ip.is_private()
1001 }
1002 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
1003 claimed_ip == observed_ip ||
1005 self.are_in_same_prefix_v6(claimed_ip, observed_ip)
1006 }
1007 _ => {
1008 false
1010 }
1011 }
1012 }
1013
1014 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1016 let ip1_octets = ip1.octets();
1018 let ip2_octets = ip2.octets();
1019
1020 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1022 return true;
1023 }
1024
1025 if ip1_octets[0] == 172 && ip2_octets[0] == 172 &&
1027 (16..=31).contains(&ip1_octets[1]) && (16..=31).contains(&ip2_octets[1]) {
1028 return true;
1029 }
1030
1031 if ip1_octets[0] == 192 && ip1_octets[1] == 168 &&
1033 ip2_octets[0] == 192 && ip2_octets[1] == 168 {
1034 return true;
1035 }
1036
1037 false
1038 }
1039
1040 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1042 let segments1 = ip1.segments();
1044 let segments2 = ip2.segments();
1045
1046 segments1[0] == segments2[0] &&
1047 segments1[1] == segments2[1] &&
1048 segments1[2] == segments2[2] &&
1049 segments1[3] == segments2[3]
1050 }
1051
1052 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1054 if frame.round.into_inner() > 1000000 {
1056 return false;
1057 }
1058
1059 if frame.target_sequence.into_inner() > 10000 {
1061 return false;
1062 }
1063
1064 match frame.local_address.ip() {
1066 IpAddr::V4(ipv4) => {
1067 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1069 }
1070 IpAddr::V6(ipv6) => {
1071 !ipv6.is_unspecified() && !ipv6.is_multicast()
1073 }
1074 }
1075 }
1076
1077 fn validate_target_peer_request(
1079 &self,
1080 requesting_peer: [u8; 32],
1081 target_peer: [u8; 32],
1082 _frame: &crate::frame::PunchMeNow
1083 ) -> bool {
1084 if requesting_peer == target_peer {
1086 return false;
1087 }
1088
1089 true
1095 }
1096
1097 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1099 self.coordination_requests.len() < self.max_coordination_per_window as usize
1108 }
1109}
1110
1111impl AdaptiveTimeoutState {
1112 pub(crate) fn new() -> Self {
1114 let base_timeout = Duration::from_millis(1000); Self {
1116 current_timeout: base_timeout,
1117 min_timeout: Duration::from_millis(100),
1118 max_timeout: Duration::from_secs(30),
1119 base_timeout,
1120 backoff_multiplier: 1.0,
1121 max_backoff_multiplier: 8.0,
1122 jitter_factor: 0.1, srtt: None,
1124 rttvar: None,
1125 last_rtt: None,
1126 consecutive_timeouts: 0,
1127 successful_responses: 0,
1128 }
1129 }
1130
1131 fn update_success(&mut self, rtt: Duration) {
1133 self.last_rtt = Some(rtt);
1134 self.successful_responses += 1;
1135 self.consecutive_timeouts = 0;
1136
1137 match self.srtt {
1139 None => {
1140 self.srtt = Some(rtt);
1141 self.rttvar = Some(rtt / 2);
1142 }
1143 Some(srtt) => {
1144 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1145 let abs_diff = if rtt > srtt { rtt - srtt } else { srtt - rtt };
1146
1147 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1148 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1149 }
1150 }
1151
1152 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1154
1155 self.calculate_current_timeout();
1157 }
1158
1159 fn update_timeout(&mut self) {
1161 self.consecutive_timeouts += 1;
1162
1163 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1165
1166 self.calculate_current_timeout();
1168 }
1169
1170 fn calculate_current_timeout(&mut self) {
1172 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1173 srtt + rttvar * 4
1175 } else {
1176 self.base_timeout
1177 };
1178
1179 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1181
1182 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1184 let timeout = timeout.mul_f64(jitter);
1185
1186 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1188 }
1189
1190 fn get_timeout(&self) -> Duration {
1192 self.current_timeout
1193 }
1194
1195 fn should_retry(&self, max_retries: u32) -> bool {
1197 self.consecutive_timeouts < max_retries
1198 }
1199
1200 fn get_retry_delay(&self) -> Duration {
1202 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1203 delay.clamp(self.min_timeout, self.max_timeout)
1204 }
1205}
1206
1207#[derive(Debug)]
1209pub(super) struct ResourceManagementConfig {
1210 max_active_validations: usize,
1212 max_local_candidates: usize,
1214 max_remote_candidates: usize,
1216 max_candidate_pairs: usize,
1218 #[allow(dead_code)] max_coordination_history: usize,
1221 cleanup_interval: Duration,
1223 candidate_timeout: Duration,
1225 validation_timeout: Duration,
1227 coordination_timeout: Duration,
1229 memory_pressure_threshold: f64,
1231 aggressive_cleanup_threshold: f64,
1233}
1234
1235#[derive(Debug, Default)]
1237pub(super) struct ResourceStats {
1238 active_validations: usize,
1240 local_candidates: usize,
1242 remote_candidates: usize,
1244 candidate_pairs: usize,
1246 peak_memory_usage: usize,
1248 cleanup_operations: u64,
1250 resources_cleaned: u64,
1252 allocation_failures: u64,
1254 #[allow(dead_code)] last_cleanup: Option<Instant>,
1257 memory_pressure: f64,
1259}
1260
1261#[derive(Debug)]
1263pub(super) struct ResourceCleanupCoordinator {
1264 config: ResourceManagementConfig,
1266 stats: ResourceStats,
1268 last_cleanup: Option<Instant>,
1270 cleanup_counter: u64,
1272 shutdown_requested: bool,
1274}
1275
1276impl ResourceManagementConfig {
1277 fn new() -> Self {
1279 Self {
1280 max_active_validations: 100,
1281 max_local_candidates: 50,
1282 max_remote_candidates: 100,
1283 max_candidate_pairs: 200,
1284 max_coordination_history: 10,
1285 cleanup_interval: Duration::from_secs(30),
1286 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1288 coordination_timeout: Duration::from_secs(60),
1289 memory_pressure_threshold: 0.75,
1290 aggressive_cleanup_threshold: 0.90,
1291 }
1292 }
1293
1294 #[allow(dead_code)] fn low_memory() -> Self {
1297 Self {
1298 max_active_validations: 25,
1299 max_local_candidates: 10,
1300 max_remote_candidates: 25,
1301 max_candidate_pairs: 50,
1302 max_coordination_history: 3,
1303 cleanup_interval: Duration::from_secs(15),
1304 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1306 coordination_timeout: Duration::from_secs(30),
1307 memory_pressure_threshold: 0.60,
1308 aggressive_cleanup_threshold: 0.80,
1309 }
1310 }
1311}
1312
1313impl ResourceCleanupCoordinator {
1314 fn new() -> Self {
1316 Self {
1317 config: ResourceManagementConfig::new(),
1318 stats: ResourceStats::default(),
1319 last_cleanup: None,
1320 cleanup_counter: 0,
1321 shutdown_requested: false,
1322 }
1323 }
1324
1325 #[allow(dead_code)] fn low_memory() -> Self {
1328 Self {
1329 config: ResourceManagementConfig::low_memory(),
1330 stats: ResourceStats::default(),
1331 last_cleanup: None,
1332 cleanup_counter: 0,
1333 shutdown_requested: false,
1334 }
1335 }
1336
1337 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1339 state.active_validations.len() > self.config.max_active_validations ||
1340 state.local_candidates.len() > self.config.max_local_candidates ||
1341 state.remote_candidates.len() > self.config.max_remote_candidates ||
1342 state.candidate_pairs.len() > self.config.max_candidate_pairs
1343 }
1344
1345 fn calculate_memory_pressure(&mut self, active_validations_len: usize, local_candidates_len: usize,
1347 remote_candidates_len: usize, candidate_pairs_len: usize) -> f64 {
1348 let total_limit = self.config.max_active_validations +
1349 self.config.max_local_candidates +
1350 self.config.max_remote_candidates +
1351 self.config.max_candidate_pairs;
1352
1353 let current_usage = active_validations_len +
1354 local_candidates_len +
1355 remote_candidates_len +
1356 candidate_pairs_len;
1357
1358 let pressure = current_usage as f64 / total_limit as f64;
1359 self.stats.memory_pressure = pressure;
1360 pressure
1361 }
1362
1363 fn should_cleanup(&self, now: Instant) -> bool {
1365 if self.shutdown_requested {
1366 return true;
1367 }
1368
1369 if let Some(last_cleanup) = self.last_cleanup {
1371 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1372 return true;
1373 }
1374 } else {
1375 return true; }
1377
1378 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1380 return true;
1381 }
1382
1383 false
1384 }
1385
1386 #[allow(dead_code)] fn cleanup_expired_resources(&mut self,
1389 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1390 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1391 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1392 candidate_pairs: &mut Vec<CandidatePair>,
1393 coordination: &mut Option<CoordinationState>,
1394 now: Instant) -> u64 {
1395 let mut cleaned = 0;
1396
1397 cleaned += self.cleanup_expired_validations(active_validations, now);
1399
1400 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1402
1403 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1405
1406 cleaned += self.cleanup_old_coordination(coordination, now);
1408
1409 self.stats.cleanup_operations += 1;
1411 self.stats.resources_cleaned += cleaned;
1412 self.last_cleanup = Some(now);
1413 self.cleanup_counter += 1;
1414
1415 debug!("Cleaned up {} expired resources", cleaned);
1416 cleaned
1417 }
1418
1419 #[allow(dead_code)] fn cleanup_expired_validations(&mut self, active_validations: &mut HashMap<SocketAddr, PathValidationState>, now: Instant) -> u64 {
1422 let mut cleaned = 0;
1423 let validation_timeout = self.config.validation_timeout;
1424
1425 active_validations.retain(|_addr, validation| {
1426 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1427 if is_expired {
1428 cleaned += 1;
1429 trace!("Cleaned up expired validation for {:?}", _addr);
1430 }
1431 !is_expired
1432 });
1433
1434 cleaned
1435 }
1436
1437 #[allow(dead_code)] fn cleanup_stale_candidates(&mut self, local_candidates: &mut HashMap<VarInt, AddressCandidate>, remote_candidates: &mut HashMap<VarInt, AddressCandidate>, now: Instant) -> u64 {
1440 let mut cleaned = 0;
1441 let candidate_timeout = self.config.candidate_timeout;
1442
1443 local_candidates.retain(|_seq, candidate| {
1445 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout ||
1446 candidate.state == CandidateState::Failed ||
1447 candidate.state == CandidateState::Removed;
1448 if is_stale {
1449 cleaned += 1;
1450 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1451 }
1452 !is_stale
1453 });
1454
1455 remote_candidates.retain(|_seq, candidate| {
1457 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout ||
1458 candidate.state == CandidateState::Failed ||
1459 candidate.state == CandidateState::Removed;
1460 if is_stale {
1461 cleaned += 1;
1462 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1463 }
1464 !is_stale
1465 });
1466
1467 cleaned
1468 }
1469
1470 #[allow(dead_code)] fn cleanup_failed_pairs(&mut self, candidate_pairs: &mut Vec<CandidatePair>, now: Instant) -> u64 {
1473 let mut cleaned = 0;
1474 let pair_timeout = self.config.candidate_timeout;
1475
1476 candidate_pairs.retain(|pair| {
1477 let is_stale = now.duration_since(pair.created_at) > pair_timeout ||
1478 pair.state == PairState::Failed;
1479 if is_stale {
1480 cleaned += 1;
1481 trace!("Cleaned up failed candidate pair {:?} -> {:?}", pair.local_addr, pair.remote_addr);
1482 }
1483 !is_stale
1484 });
1485
1486 cleaned
1487 }
1488
1489 #[allow(dead_code)] fn cleanup_old_coordination(&mut self, coordination: &mut Option<CoordinationState>, now: Instant) -> u64 {
1492 let mut cleaned = 0;
1493
1494 if let Some(coord) = coordination {
1495 let is_expired = now.duration_since(coord.round_start) > self.config.coordination_timeout;
1496 let is_failed = coord.state == CoordinationPhase::Failed;
1497
1498 if is_expired || is_failed {
1499 let round = coord.round;
1500 *coordination = None;
1501 cleaned += 1;
1502 trace!("Cleaned up old coordination state for round {}", round);
1503 }
1504 }
1505
1506 cleaned
1507 }
1508
1509 #[allow(dead_code)] fn aggressive_cleanup(&mut self,
1512 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1513 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1514 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1515 candidate_pairs: &mut Vec<CandidatePair>,
1516 now: Instant) -> u64 {
1517 let mut cleaned = 0;
1518
1519 let aggressive_timeout = self.config.candidate_timeout / 2;
1521
1522 local_candidates.retain(|_seq, candidate| {
1524 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout &&
1525 candidate.state != CandidateState::Failed;
1526 if !keep {
1527 cleaned += 1;
1528 }
1529 keep
1530 });
1531
1532 remote_candidates.retain(|_seq, candidate| {
1533 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout &&
1534 candidate.state != CandidateState::Failed;
1535 if !keep {
1536 cleaned += 1;
1537 }
1538 keep
1539 });
1540
1541 candidate_pairs.retain(|pair| {
1543 let keep = pair.state != PairState::Waiting ||
1544 now.duration_since(pair.created_at) <= aggressive_timeout;
1545 if !keep {
1546 cleaned += 1;
1547 }
1548 keep
1549 });
1550
1551 active_validations.retain(|_addr, validation| {
1553 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1554 if !keep {
1555 cleaned += 1;
1556 }
1557 keep
1558 });
1559
1560 warn!("Aggressive cleanup removed {} resources due to memory pressure", cleaned);
1561 cleaned
1562 }
1563
1564 #[allow(dead_code)] fn request_shutdown(&mut self) {
1567 self.shutdown_requested = true;
1568 debug!("Resource cleanup coordinator shutdown requested");
1569 }
1570
1571 #[allow(dead_code)] fn shutdown_cleanup(&mut self,
1574 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1575 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1576 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1577 candidate_pairs: &mut Vec<CandidatePair>,
1578 coordination: &mut Option<CoordinationState>) -> u64 {
1579 let mut cleaned = 0;
1580
1581 cleaned += active_validations.len() as u64;
1583 active_validations.clear();
1584
1585 cleaned += local_candidates.len() as u64;
1586 local_candidates.clear();
1587
1588 cleaned += remote_candidates.len() as u64;
1589 remote_candidates.clear();
1590
1591 cleaned += candidate_pairs.len() as u64;
1592 candidate_pairs.clear();
1593
1594 if coordination.is_some() {
1595 *coordination = None;
1596 cleaned += 1;
1597 }
1598
1599 info!("Shutdown cleanup removed {} resources", cleaned);
1600 cleaned
1601 }
1602
1603 #[allow(dead_code)] fn get_resource_stats(&self) -> &ResourceStats {
1606 &self.stats
1607 }
1608
1609 fn update_stats(&mut self, active_validations_len: usize, local_candidates_len: usize,
1611 remote_candidates_len: usize, candidate_pairs_len: usize) {
1612 self.stats.active_validations = active_validations_len;
1613 self.stats.local_candidates = local_candidates_len;
1614 self.stats.remote_candidates = remote_candidates_len;
1615 self.stats.candidate_pairs = candidate_pairs_len;
1616
1617 let current_usage = self.stats.active_validations +
1619 self.stats.local_candidates +
1620 self.stats.remote_candidates +
1621 self.stats.candidate_pairs;
1622
1623 if current_usage > self.stats.peak_memory_usage {
1624 self.stats.peak_memory_usage = current_usage;
1625 }
1626 }
1627
1628 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1630 self.last_cleanup = Some(now);
1631 self.cleanup_counter += 1;
1632
1633 self.stats.cleanup_operations += 1;
1635
1636 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1637 }
1638}
1639
1640impl NetworkConditionMonitor {
1641 fn new() -> Self {
1643 Self {
1644 rtt_samples: VecDeque::new(),
1645 max_samples: 20,
1646 packet_loss_rate: 0.0,
1647 congestion_window: 10,
1648 quality_score: 0.8, last_quality_update: Instant::now(),
1650 quality_update_interval: Duration::from_secs(10),
1651 timeout_stats: TimeoutStatistics::default(),
1652 }
1653 }
1654
1655 fn record_success(&mut self, rtt: Duration, now: Instant) {
1657 self.rtt_samples.push_back(rtt);
1659 if self.rtt_samples.len() > self.max_samples {
1660 self.rtt_samples.pop_front();
1661 }
1662
1663 self.timeout_stats.total_responses += 1;
1665 self.update_timeout_stats(now);
1666
1667 self.update_quality_score(now);
1669 }
1670
1671 fn record_timeout(&mut self, now: Instant) {
1673 self.timeout_stats.total_timeouts += 1;
1674 self.update_timeout_stats(now);
1675
1676 self.update_quality_score(now);
1678 }
1679
1680 fn update_timeout_stats(&mut self, now: Instant) {
1682 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1683
1684 if total_attempts > 0 {
1685 self.timeout_stats.timeout_rate = self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1686 }
1687
1688 if !self.rtt_samples.is_empty() {
1690 let total_rtt: Duration = self.rtt_samples.iter().sum();
1691 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1692 }
1693
1694 self.timeout_stats.last_update = Some(now);
1695 }
1696
1697 fn update_quality_score(&mut self, now: Instant) {
1699 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1700 return;
1701 }
1702
1703 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1705 let rtt_factor = self.calculate_rtt_factor();
1706 let consistency_factor = self.calculate_consistency_factor();
1707
1708 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1710
1711 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1713 self.last_quality_update = now;
1714 }
1715
1716 fn calculate_rtt_factor(&self) -> f64 {
1718 if self.rtt_samples.is_empty() {
1719 return 0.5; }
1721
1722 let avg_rtt = self.timeout_stats.avg_response_time;
1723
1724 let rtt_ms = avg_rtt.as_millis() as f64;
1726 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1727 factor.clamp(0.0, 1.0)
1728 }
1729
1730 fn calculate_consistency_factor(&self) -> f64 {
1732 if self.rtt_samples.len() < 3 {
1733 return 0.5; }
1735
1736 let mean_rtt = self.timeout_stats.avg_response_time;
1738 let variance: f64 = self.rtt_samples
1739 .iter()
1740 .map(|rtt| {
1741 let diff = if *rtt > mean_rtt { *rtt - mean_rtt } else { mean_rtt - *rtt };
1742 diff.as_millis() as f64
1743 })
1744 .map(|diff| diff * diff)
1745 .sum::<f64>() / self.rtt_samples.len() as f64;
1746
1747 let std_dev = variance.sqrt();
1748
1749 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1751 consistency.clamp(0.0, 1.0)
1752 }
1753
1754 fn get_quality_score(&self) -> f64 {
1756 self.quality_score
1757 }
1758
1759 fn get_estimated_rtt(&self) -> Option<Duration> {
1761 if self.rtt_samples.is_empty() {
1762 return None;
1763 }
1764
1765 Some(self.timeout_stats.avg_response_time)
1766 }
1767
1768 fn is_suitable_for_coordination(&self) -> bool {
1770 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1772 }
1773
1774 #[allow(dead_code)] fn get_packet_loss_rate(&self) -> f64 {
1777 self.packet_loss_rate
1778 }
1779
1780 #[allow(dead_code)] fn get_timeout_multiplier(&self) -> f64 {
1783 let base_multiplier = 1.0;
1784
1785 let quality_multiplier = if self.quality_score < 0.3 {
1787 2.0 } else if self.quality_score > 0.8 {
1789 0.8 } else {
1791 1.0 };
1793
1794 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1796
1797 base_multiplier * quality_multiplier * loss_multiplier
1798 }
1799
1800 #[allow(dead_code)] fn cleanup(&mut self, now: Instant) {
1803 let _cutoff_time = now - Duration::from_secs(60);
1805
1806 if let Some(last_update) = self.timeout_stats.last_update {
1808 if now.duration_since(last_update) > Duration::from_secs(300) {
1809 self.timeout_stats = TimeoutStatistics::default();
1810 }
1811 }
1812 }
1813}
1814
1815impl NatTraversalState {
1816 pub(super) fn new(
1818 role: NatTraversalRole,
1819 max_candidates: u32,
1820 coordination_timeout: Duration,
1821 ) -> Self {
1822 let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1823 Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1824 } else {
1825 None
1826 };
1827
1828 Self {
1829 role,
1830 local_candidates: HashMap::new(),
1831 remote_candidates: HashMap::new(),
1832 candidate_pairs: Vec::new(),
1833 pair_index: HashMap::new(),
1834 active_validations: HashMap::new(),
1835 coordination: None,
1836 next_sequence: VarInt::from_u32(1),
1837 max_candidates,
1838 coordination_timeout,
1839 stats: NatTraversalStats::default(),
1840 security_state: SecurityValidationState::new(),
1841 network_monitor: NetworkConditionMonitor::new(),
1842 resource_manager: ResourceCleanupCoordinator::new(),
1843 bootstrap_coordinator,
1844 multi_dest_transmitter: MultiDestinationTransmitter::new(),
1845 }
1846 }
1847
1848 pub(super) fn add_remote_candidate(
1850 &mut self,
1851 sequence: VarInt,
1852 address: SocketAddr,
1853 priority: VarInt,
1854 now: Instant,
1855 ) -> Result<(), NatTraversalError> {
1856 if self.should_reject_new_resources(now) {
1858 debug!("Rejecting new candidate due to resource limits: {}", address);
1859 return Err(NatTraversalError::ResourceLimitExceeded);
1860 }
1861
1862 if self.security_state.is_candidate_rate_limited(now) {
1864 self.stats.rate_limit_violations += 1;
1865 debug!("Rate limit exceeded for candidate addition: {}", address);
1866 return Err(NatTraversalError::RateLimitExceeded);
1867 }
1868
1869 match self.security_state.validate_address(address, now) {
1871 AddressValidationResult::Invalid => {
1872 self.stats.invalid_address_rejections += 1;
1873 self.stats.security_rejections += 1;
1874 debug!("Invalid address rejected: {}", address);
1875 return Err(NatTraversalError::InvalidAddress);
1876 }
1877 AddressValidationResult::Suspicious => {
1878 self.stats.security_rejections += 1;
1879 debug!("Suspicious address rejected: {}", address);
1880 return Err(NatTraversalError::SecurityValidationFailed);
1881 }
1882 AddressValidationResult::Valid => {
1883 }
1885 }
1886
1887 if self.remote_candidates.len() >= self.max_candidates as usize {
1889 return Err(NatTraversalError::TooManyCandidates);
1890 }
1891
1892 if self.remote_candidates.values()
1894 .any(|c| c.address == address && c.state != CandidateState::Removed)
1895 {
1896 return Err(NatTraversalError::DuplicateAddress);
1897 }
1898
1899 let candidate = AddressCandidate {
1900 address,
1901 priority: priority.into_inner() as u32,
1902 source: CandidateSource::Peer,
1903 discovered_at: now,
1904 state: CandidateState::New,
1905 attempt_count: 0,
1906 last_attempt: None,
1907 };
1908
1909 self.remote_candidates.insert(sequence, candidate);
1910 self.stats.remote_candidates_received += 1;
1911
1912 trace!("Added remote candidate: {} with priority {}", address, priority);
1913 Ok(())
1914 }
1915
1916 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1918 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1919 candidate.state = CandidateState::Removed;
1920
1921 self.active_validations.remove(&candidate.address);
1923 true
1924 } else {
1925 false
1926 }
1927 }
1928
1929 pub(super) fn add_local_candidate(
1931 &mut self,
1932 address: SocketAddr,
1933 source: CandidateSource,
1934 now: Instant,
1935 ) -> VarInt {
1936 let sequence = self.next_sequence;
1937 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1938 .expect("sequence number overflow");
1939
1940 let candidate_type = classify_candidate_type(source);
1942 let local_preference = self.calculate_local_preference(address);
1943 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1944
1945 let candidate = AddressCandidate {
1946 address,
1947 priority,
1948 source,
1949 discovered_at: now,
1950 state: CandidateState::New,
1951 attempt_count: 0,
1952 last_attempt: None,
1953 };
1954
1955 self.local_candidates.insert(sequence, candidate);
1956 self.stats.local_candidates_sent += 1;
1957
1958 self.generate_candidate_pairs(now);
1960
1961 sequence
1962 }
1963
1964 fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1966 match addr {
1967 SocketAddr::V4(v4) => {
1968 if v4.ip().is_loopback() {
1969 0 } else if v4.ip().is_private() {
1971 65000 } else {
1973 32000 }
1975 }
1976 SocketAddr::V6(v6) => {
1977 if v6.ip().is_loopback() {
1978 0
1979 } else if v6.ip().is_unicast_link_local() {
1980 30000 } else {
1982 50000 }
1984 }
1985 }
1986 }
1987
1988 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1990 self.candidate_pairs.clear();
1991 self.pair_index.clear();
1992
1993 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1995 self.candidate_pairs.reserve(estimated_capacity);
1996 self.pair_index.reserve(estimated_capacity);
1997
1998 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
2000
2001 for (_local_seq, local_candidate) in &self.local_candidates {
2002 if local_candidate.state == CandidateState::Removed {
2004 continue;
2005 }
2006
2007 let local_type = classify_candidate_type(local_candidate.source);
2009
2010 for (remote_seq, remote_candidate) in &self.remote_candidates {
2011 if remote_candidate.state == CandidateState::Removed {
2013 continue;
2014 }
2015
2016 let cache_key = (local_candidate.address, remote_candidate.address);
2018 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2019 are_candidates_compatible(local_candidate, remote_candidate)
2020 });
2021
2022 if !compatible {
2023 continue;
2024 }
2025
2026 let pair_priority = calculate_pair_priority(
2028 local_candidate.priority,
2029 remote_candidate.priority
2030 );
2031
2032 let remote_type = classify_candidate_type(remote_candidate.source);
2034 let pair_type = classify_pair_type(local_type, remote_type);
2035
2036 let pair = CandidatePair {
2037 remote_sequence: *remote_seq,
2038 local_addr: local_candidate.address,
2039 remote_addr: remote_candidate.address,
2040 priority: pair_priority,
2041 state: PairState::Waiting,
2042 pair_type,
2043 created_at: now,
2044 last_check: None,
2045 };
2046
2047 let index = self.candidate_pairs.len();
2049 self.pair_index.insert(remote_candidate.address, index);
2050 self.candidate_pairs.push(pair);
2051 }
2052 }
2053
2054 self.candidate_pairs.sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2056
2057 self.pair_index.clear();
2059 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2060 self.pair_index.insert(pair.remote_addr, idx);
2061 }
2062
2063 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2064 }
2065
2066 pub(super) fn get_next_validation_pairs(&mut self, max_concurrent: usize) -> Vec<&mut CandidatePair> {
2068 let mut result = Vec::with_capacity(max_concurrent);
2071
2072 for pair in self.candidate_pairs.iter_mut() {
2073 if pair.state == PairState::Waiting {
2074 result.push(pair);
2075 if result.len() >= max_concurrent {
2076 break;
2077 }
2078 }
2079 }
2080
2081 result
2082 }
2083
2084 pub(super) fn find_pair_by_remote_addr(&mut self, addr: SocketAddr) -> Option<&mut CandidatePair> {
2086 if let Some(&index) = self.pair_index.get(&addr) {
2088 self.candidate_pairs.get_mut(index)
2089 } else {
2090 None
2091 }
2092 }
2093
2094 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2096 let (succeeded_type, succeeded_priority) = {
2098 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2099 pair.state = PairState::Succeeded;
2100 (pair.pair_type, pair.priority)
2101 } else {
2102 return false;
2103 }
2104 };
2105
2106 for other_pair in &mut self.candidate_pairs {
2108 if other_pair.pair_type == succeeded_type
2109 && other_pair.priority < succeeded_priority
2110 && other_pair.state == PairState::Waiting {
2111 other_pair.state = PairState::Frozen;
2112 }
2113 }
2114
2115 true
2116 }
2117
2118
2119 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2121 let mut best_ipv4: Option<&CandidatePair> = None;
2122 let mut best_ipv6: Option<&CandidatePair> = None;
2123
2124 for pair in &self.candidate_pairs {
2125 if pair.state != PairState::Succeeded {
2126 continue;
2127 }
2128
2129 match pair.remote_addr {
2130 SocketAddr::V4(_) => {
2131 if best_ipv4.map_or(true, |best| pair.priority > best.priority) {
2132 best_ipv4 = Some(pair);
2133 }
2134 }
2135 SocketAddr::V6(_) => {
2136 if best_ipv6.map_or(true, |best| pair.priority > best.priority) {
2137 best_ipv6 = Some(pair);
2138 }
2139 }
2140 }
2141 }
2142
2143 let mut result = Vec::new();
2144 if let Some(pair) = best_ipv4 {
2145 result.push(pair);
2146 }
2147 if let Some(pair) = best_ipv6 {
2148 result.push(pair);
2149 }
2150 result
2151 }
2152
2153 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2155 let mut candidates: Vec<_> = self.remote_candidates
2156 .iter()
2157 .filter(|(_, c)| c.state == CandidateState::New)
2158 .map(|(k, v)| (*k, v))
2159 .collect();
2160
2161 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2163 candidates
2164 }
2165
2166 pub(super) fn start_validation(
2168 &mut self,
2169 sequence: VarInt,
2170 challenge: u64,
2171 now: Instant,
2172 ) -> Result<(), NatTraversalError> {
2173 let candidate = self.remote_candidates.get_mut(&sequence)
2174 .ok_or(NatTraversalError::UnknownCandidate)?;
2175
2176 if candidate.state != CandidateState::New {
2177 return Err(NatTraversalError::InvalidCandidateState);
2178 }
2179
2180 if Self::is_validation_suspicious(candidate, now) {
2182 self.stats.security_rejections += 1;
2183 debug!("Suspicious validation attempt rejected for address {}", candidate.address);
2184 return Err(NatTraversalError::SecurityValidationFailed);
2185 }
2186
2187 if self.active_validations.len() >= 10 {
2189 debug!("Too many concurrent validations, rejecting new validation for {}", candidate.address);
2190 return Err(NatTraversalError::SecurityValidationFailed);
2191 }
2192
2193 candidate.state = CandidateState::Validating;
2195 candidate.attempt_count += 1;
2196 candidate.last_attempt = Some(now);
2197
2198 let validation = PathValidationState {
2200 challenge,
2201 sent_at: now,
2202 retry_count: 0,
2203 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2205 timeout_state: AdaptiveTimeoutState::new(),
2206 last_retry_at: None,
2207 };
2208
2209 self.active_validations.insert(candidate.address, validation);
2210 trace!("Started validation for candidate {} with challenge {}", candidate.address, challenge);
2211 Ok(())
2212 }
2213
2214 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2216 if candidate.attempt_count > 10 {
2218 return true;
2219 }
2220
2221 if let Some(last_attempt) = candidate.last_attempt {
2223 let time_since_last = now.duration_since(last_attempt);
2224 if time_since_last < Duration::from_millis(100) {
2225 return true; }
2227 }
2228
2229 if candidate.state == CandidateState::Failed {
2231 let time_since_discovery = now.duration_since(candidate.discovered_at);
2232 if time_since_discovery < Duration::from_secs(60) {
2233 return true; }
2235 }
2236
2237 false
2238 }
2239
2240 pub(super) fn handle_validation_success(
2242 &mut self,
2243 remote_addr: SocketAddr,
2244 challenge: u64,
2245 now: Instant,
2246 ) -> Result<VarInt, NatTraversalError> {
2247 let sequence = self.remote_candidates
2249 .iter()
2250 .find(|(_, c)| c.address == remote_addr)
2251 .map(|(seq, _)| *seq)
2252 .ok_or(NatTraversalError::UnknownCandidate)?;
2253
2254 let validation = self.active_validations.get_mut(&remote_addr)
2256 .ok_or(NatTraversalError::NoActiveValidation)?;
2257
2258 if validation.challenge != challenge {
2259 return Err(NatTraversalError::ChallengeMismatch);
2260 }
2261
2262 let rtt = now.duration_since(validation.sent_at);
2264 validation.timeout_state.update_success(rtt);
2265
2266 self.network_monitor.record_success(rtt, now);
2268
2269 let candidate = self.remote_candidates.get_mut(&sequence)
2271 .ok_or(NatTraversalError::UnknownCandidate)?;
2272
2273 candidate.state = CandidateState::Valid;
2274 self.active_validations.remove(&remote_addr);
2275 self.stats.validations_succeeded += 1;
2276
2277 trace!("Validation successful for {} with RTT {:?}", remote_addr, rtt);
2278 Ok(sequence)
2279 }
2280
2281
2282 pub(super) fn start_coordination_round(
2284 &mut self,
2285 targets: Vec<PunchTarget>,
2286 now: Instant,
2287 ) -> Result<VarInt, NatTraversalError> {
2288 if self.security_state.is_coordination_rate_limited(now) {
2290 self.stats.rate_limit_violations += 1;
2291 debug!("Rate limit exceeded for coordination request with {} targets", targets.len());
2292 return Err(NatTraversalError::RateLimitExceeded);
2293 }
2294
2295 if self.is_coordination_suspicious(&targets, now) {
2297 self.stats.suspicious_coordination_attempts += 1;
2298 self.stats.security_rejections += 1;
2299 debug!("Suspicious coordination request rejected with {} targets", targets.len());
2300 return Err(NatTraversalError::SuspiciousCoordination);
2301 }
2302
2303 for target in &targets {
2305 match self.security_state.validate_address(target.remote_addr, now) {
2306 AddressValidationResult::Invalid => {
2307 self.stats.invalid_address_rejections += 1;
2308 self.stats.security_rejections += 1;
2309 debug!("Invalid target address in coordination: {}", target.remote_addr);
2310 return Err(NatTraversalError::InvalidAddress);
2311 }
2312 AddressValidationResult::Suspicious => {
2313 self.stats.security_rejections += 1;
2314 debug!("Suspicious target address in coordination: {}", target.remote_addr);
2315 return Err(NatTraversalError::SecurityValidationFailed);
2316 }
2317 AddressValidationResult::Valid => {
2318 }
2320 }
2321 }
2322
2323 let round = self.next_sequence;
2324 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2325 .expect("sequence number overflow");
2326
2327 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2330
2331 self.coordination = Some(CoordinationState {
2332 round,
2333 punch_targets: targets,
2334 round_start: now,
2335 punch_start,
2336 round_duration: self.coordination_timeout,
2337 state: CoordinationPhase::Requesting,
2338 punch_request_sent: false,
2339 peer_punch_received: false,
2340 retry_count: 0,
2341 max_retries: 3,
2342 timeout_state: AdaptiveTimeoutState::new(),
2343 last_retry_at: None,
2344 });
2345
2346 self.stats.coordination_rounds += 1;
2347 trace!("Started coordination round {} with {} targets", round, self.coordination.as_ref().unwrap().punch_targets.len());
2348 Ok(round)
2349 }
2350
2351 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2353 if targets.len() > 20 {
2355 return true;
2356 }
2357
2358 let mut seen_addresses = std::collections::HashSet::new();
2360 for target in targets {
2361 if !seen_addresses.insert(target.remote_addr) {
2362 return true; }
2364 }
2365
2366 if targets.len() > 5 {
2368 let mut ipv4_addresses: Vec<_> = targets
2370 .iter()
2371 .filter_map(|t| match t.remote_addr.ip() {
2372 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2373 _ => None,
2374 })
2375 .collect();
2376
2377 if ipv4_addresses.len() >= 3 {
2378 ipv4_addresses.sort();
2379 let mut sequential_count = 1;
2380 for i in 1..ipv4_addresses.len() {
2381 if ipv4_addresses[i] == ipv4_addresses[i-1] + 1 {
2382 sequential_count += 1;
2383 if sequential_count >= 3 {
2384 return true; }
2386 } else {
2387 sequential_count = 1;
2388 }
2389 }
2390 }
2391 }
2392
2393 false
2394 }
2395
2396 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2398 self.coordination.as_ref().map(|c| c.state)
2399 }
2400
2401 pub(super) fn should_send_punch_request(&self) -> bool {
2403 if let Some(coord) = &self.coordination {
2404 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2405 } else {
2406 false
2407 }
2408 }
2409
2410 pub(super) fn mark_punch_request_sent(&mut self) {
2412 if let Some(coord) = &mut self.coordination {
2413 coord.punch_request_sent = true;
2414 coord.state = CoordinationPhase::Coordinating;
2415 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2416 }
2417 }
2418
2419 pub(super) fn handle_peer_punch_request(&mut self, peer_round: VarInt, now: Instant) -> Result<bool, NatTraversalError> {
2421 if self.is_peer_coordination_suspicious(peer_round, now) {
2423 self.stats.suspicious_coordination_attempts += 1;
2424 self.stats.security_rejections += 1;
2425 debug!("Suspicious peer coordination request rejected for round {}", peer_round);
2426 return Err(NatTraversalError::SuspiciousCoordination);
2427 }
2428
2429 if let Some(coord) = &mut self.coordination {
2430 if coord.round == peer_round {
2431 match coord.state {
2432 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2433 coord.peer_punch_received = true;
2434 coord.state = CoordinationPhase::Preparing;
2435
2436 let network_rtt = self.network_monitor.get_estimated_rtt()
2438 .unwrap_or(Duration::from_millis(100));
2439 let quality_score = self.network_monitor.get_quality_score();
2440
2441 let base_grace = Duration::from_millis(150);
2443 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2444 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2445
2446 let adaptive_grace = Duration::from_millis(
2447 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64
2448 );
2449
2450 coord.punch_start = now + adaptive_grace;
2451
2452 trace!("Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2453 adaptive_grace, network_rtt, quality_score);
2454 Ok(true)
2455 }
2456 CoordinationPhase::Preparing => {
2457 trace!("Peer coordination confirmed during preparation");
2459 Ok(true)
2460 }
2461 _ => {
2462 debug!("Received coordination in unexpected phase: {:?}", coord.state);
2463 Ok(false)
2464 }
2465 }
2466 } else {
2467 debug!("Received coordination for wrong round: {} vs {}", peer_round, coord.round);
2468 Ok(false)
2469 }
2470 } else {
2471 debug!("Received peer coordination but no active round");
2472 Ok(false)
2473 }
2474 }
2475
2476 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2478 if peer_round.into_inner() == 0 {
2480 return true; }
2482
2483 if let Some(coord) = &self.coordination {
2485 let our_round = coord.round.into_inner();
2486 let peer_round_num = peer_round.into_inner();
2487
2488 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2490 return true;
2491 }
2492 }
2493
2494 false
2495 }
2496
2497 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2499 if let Some(coord) = &self.coordination {
2500 match coord.state {
2501 CoordinationPhase::Preparing => now >= coord.punch_start,
2502 CoordinationPhase::Coordinating => {
2503 coord.peer_punch_received && now >= coord.punch_start
2505 }
2506 _ => false,
2507 }
2508 } else {
2509 false
2510 }
2511 }
2512
2513 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2515 if let Some(coord) = &mut self.coordination {
2516 coord.state = CoordinationPhase::Punching;
2517
2518 let network_rtt = self.network_monitor.get_estimated_rtt()
2520 .unwrap_or(Duration::from_millis(100));
2521
2522 let jitter_ms: u64 = rand::random::<u64>() % 11;
2524 let jitter = Duration::from_millis(jitter_ms);
2525 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2526
2527 coord.punch_start = transmission_time.max(now);
2529
2530 trace!("Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2531 coord.punch_start, network_rtt, jitter);
2532 }
2533 }
2534
2535 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2537 self.coordination.as_ref().map(|c| c.punch_targets.as_slice())
2538 }
2539
2540
2541 pub(super) fn mark_coordination_validating(&mut self) {
2543 if let Some(coord) = &mut self.coordination {
2544 if coord.state == CoordinationPhase::Punching {
2545 coord.state = CoordinationPhase::Validating;
2546 trace!("Coordination moved to validation phase");
2547 }
2548 }
2549 }
2550
2551 pub(super) fn handle_coordination_success(&mut self, remote_addr: SocketAddr, now: Instant) -> bool {
2553 if let Some(coord) = &mut self.coordination {
2554 let was_target = coord.punch_targets.iter().any(|target| target.remote_addr == remote_addr);
2556
2557 if was_target && coord.state == CoordinationPhase::Validating {
2558 let rtt = now.duration_since(coord.round_start);
2560 coord.timeout_state.update_success(rtt);
2561 self.network_monitor.record_success(rtt, now);
2562
2563 coord.state = CoordinationPhase::Succeeded;
2564 self.stats.direct_connections += 1;
2565 trace!("Coordination succeeded via {} with RTT {:?}", remote_addr, rtt);
2566 true
2567 } else {
2568 false
2569 }
2570 } else {
2571 false
2572 }
2573 }
2574
2575 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2577 if let Some(coord) = &mut self.coordination {
2578 coord.retry_count += 1;
2579 coord.timeout_state.update_timeout();
2580 self.network_monitor.record_timeout(now);
2581
2582 if coord.timeout_state.should_retry(coord.max_retries)
2584 && self.network_monitor.is_suitable_for_coordination() {
2585
2586 coord.state = CoordinationPhase::Requesting;
2588 coord.punch_request_sent = false;
2589 coord.peer_punch_received = false;
2590 coord.round_start = now;
2591 coord.last_retry_at = Some(now);
2592
2593 let retry_delay = coord.timeout_state.get_retry_delay();
2595
2596 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2598 let adjusted_delay = Duration::from_millis(
2599 (retry_delay.as_millis() as f64 * quality_multiplier) as u64
2600 );
2601
2602 coord.punch_start = now + adjusted_delay;
2603
2604 trace!("Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2605 coord.round, coord.retry_count + 1, adjusted_delay, self.network_monitor.get_quality_score());
2606 true
2607 } else {
2608 coord.state = CoordinationPhase::Failed;
2609 self.stats.coordination_failures += 1;
2610
2611 if !self.network_monitor.is_suitable_for_coordination() {
2612 trace!("Coordination failed due to poor network conditions (quality: {:.2})",
2613 self.network_monitor.get_quality_score());
2614 } else {
2615 trace!("Coordination failed after {} attempts", coord.retry_count);
2616 }
2617 false
2618 }
2619 } else {
2620 false
2621 }
2622 }
2623
2624
2625 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2627 if let Some(coord) = &mut self.coordination {
2628 let timeout = coord.timeout_state.get_timeout();
2629 let elapsed = now.duration_since(coord.round_start);
2630
2631 if elapsed > timeout {
2632 trace!("Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2633 coord.round, elapsed, timeout);
2634 self.handle_coordination_failure(now);
2635 true
2636 } else {
2637 false
2638 }
2639 } else {
2640 false
2641 }
2642 }
2643
2644
2645 pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2647 let mut expired_validations = Vec::new();
2648 let mut retry_validations = Vec::new();
2649
2650 for (addr, validation) in &mut self.active_validations {
2651 let timeout = validation.timeout_state.get_timeout();
2652 let elapsed = now.duration_since(validation.sent_at);
2653
2654 if elapsed >= timeout {
2655 if validation.timeout_state.should_retry(validation.max_retries) {
2656 retry_validations.push(*addr);
2658 } else {
2659 expired_validations.push(*addr);
2661 }
2662 }
2663 }
2664
2665 for addr in retry_validations {
2667 if let Some(validation) = self.active_validations.get_mut(&addr) {
2668 validation.retry_count += 1;
2669 validation.sent_at = now;
2670 validation.last_retry_at = Some(now);
2671 validation.timeout_state.update_timeout();
2672
2673 trace!("Retrying validation for {} (attempt {})", addr, validation.retry_count + 1);
2674 }
2675 }
2676
2677 for addr in &expired_validations {
2679 self.active_validations.remove(addr);
2680 self.network_monitor.record_timeout(now);
2681 trace!("Validation expired for {}", addr);
2682 }
2683
2684 expired_validations
2685 }
2686
2687 pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2689 let mut retry_addresses = Vec::new();
2690
2691 for (addr, validation) in &mut self.active_validations {
2693 let elapsed = now.duration_since(validation.sent_at);
2694 let timeout = validation.timeout_state.get_timeout();
2695
2696 if elapsed > timeout && validation.timeout_state.should_retry(validation.max_retries) {
2697 validation.retry_count += 1;
2699 validation.last_retry_at = Some(now);
2700 validation.sent_at = now; validation.timeout_state.update_timeout();
2702
2703 retry_addresses.push(*addr);
2704 trace!("Scheduled retry {} for validation to {}", validation.retry_count, addr);
2705 }
2706 }
2707
2708 retry_addresses
2709 }
2710
2711
2712 pub(super) fn update_network_conditions(&mut self, now: Instant) {
2714 self.network_monitor.cleanup(now);
2715
2716 let multiplier = self.network_monitor.get_timeout_multiplier();
2718
2719 for validation in self.active_validations.values_mut() {
2721 if multiplier > 1.5 {
2722 validation.timeout_state.backoff_multiplier =
2724 (validation.timeout_state.backoff_multiplier * 1.2).min(validation.timeout_state.max_backoff_multiplier);
2725 } else if multiplier < 0.8 {
2726 validation.timeout_state.backoff_multiplier =
2728 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2729 }
2730 }
2731 }
2732
2733
2734 pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2736 if let Some(coord) = &self.coordination {
2737 if coord.retry_count > 0 {
2738 if let Some(last_retry) = coord.last_retry_at {
2739 let retry_delay = coord.timeout_state.get_retry_delay();
2740 return now.duration_since(last_retry) >= retry_delay;
2741 }
2742 }
2743 }
2744 false
2745 }
2746
2747
2748 pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2750 self.resource_manager.update_stats(
2752 self.active_validations.len(),
2753 self.local_candidates.len(),
2754 self.remote_candidates.len(),
2755 self.candidate_pairs.len()
2756 );
2757
2758 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2760 self.active_validations.len(),
2761 self.local_candidates.len(),
2762 self.remote_candidates.len(),
2763 self.candidate_pairs.len()
2764 );
2765
2766 let mut cleaned = 0;
2768
2769 if self.resource_manager.should_cleanup(now) {
2770 cleaned += self.resource_manager.cleanup_expired_resources(
2771 &mut self.active_validations,
2772 &mut self.local_candidates,
2773 &mut self.remote_candidates,
2774 &mut self.candidate_pairs,
2775 &mut self.coordination,
2776 now
2777 );
2778
2779 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2781 cleaned += self.resource_manager.aggressive_cleanup(
2782 &mut self.active_validations,
2783 &mut self.local_candidates,
2784 &mut self.remote_candidates,
2785 &mut self.candidate_pairs,
2786 now
2787 );
2788 }
2789 }
2790
2791 cleaned
2792 }
2793
2794
2795 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2797 self.resource_manager.update_stats(
2799 self.active_validations.len(),
2800 self.local_candidates.len(),
2801 self.remote_candidates.len(),
2802 self.candidate_pairs.len()
2803 );
2804 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2805 self.active_validations.len(),
2806 self.local_candidates.len(),
2807 self.remote_candidates.len(),
2808 self.candidate_pairs.len()
2809 );
2810
2811 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2813 self.resource_manager.stats.allocation_failures += 1;
2814 return true;
2815 }
2816
2817 if self.resource_manager.check_resource_limits(self) {
2819 self.resource_manager.stats.allocation_failures += 1;
2820 return true;
2821 }
2822
2823 false
2824 }
2825
2826
2827 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2829 let mut next_timeout = None;
2830
2831 if let Some(coord) = &self.coordination {
2833 match coord.state {
2834 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2835 let timeout_at = coord.round_start + self.coordination_timeout;
2836 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2837 }
2838 CoordinationPhase::Preparing => {
2839 next_timeout = Some(next_timeout.map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)));
2841 }
2842 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2843 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2845 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2846 }
2847 _ => {}
2848 }
2849 }
2850
2851 for (_, validation) in &self.active_validations {
2853 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2854 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2855 }
2856
2857 if self.resource_manager.should_cleanup(now) {
2859 let cleanup_at = now + Duration::from_secs(1);
2861 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2862 }
2863
2864 next_timeout
2865 }
2866
2867 pub(super) fn handle_timeout(&mut self, now: Instant) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2869 let mut actions = Vec::new();
2870
2871 if let Some(coord) = &mut self.coordination {
2873 match coord.state {
2874 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2875 let timeout_at = coord.round_start + self.coordination_timeout;
2876 if now >= timeout_at {
2877 coord.retry_count += 1;
2878 if coord.retry_count >= coord.max_retries {
2879 debug!("Coordination failed after {} retries", coord.retry_count);
2880 coord.state = CoordinationPhase::Failed;
2881 actions.push(TimeoutAction::Failed);
2882 } else {
2883 debug!("Coordination timeout, retrying ({}/{})", coord.retry_count, coord.max_retries);
2884 coord.state = CoordinationPhase::Requesting;
2885 coord.round_start = now;
2886 actions.push(TimeoutAction::RetryCoordination);
2887 }
2888 }
2889 }
2890 CoordinationPhase::Preparing => {
2891 if now >= coord.punch_start {
2893 debug!("Starting coordinated hole punching");
2894 coord.state = CoordinationPhase::Punching;
2895 actions.push(TimeoutAction::StartValidation);
2896 }
2897 }
2898 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2899 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2900 if now >= timeout_at {
2901 coord.retry_count += 1;
2902 if coord.retry_count >= coord.max_retries {
2903 debug!("Validation failed after {} retries", coord.retry_count);
2904 coord.state = CoordinationPhase::Failed;
2905 actions.push(TimeoutAction::Failed);
2906 } else {
2907 debug!("Validation timeout, retrying ({}/{})", coord.retry_count, coord.max_retries);
2908 coord.state = CoordinationPhase::Punching;
2909 actions.push(TimeoutAction::StartValidation);
2910 }
2911 }
2912 }
2913 CoordinationPhase::Succeeded => {
2914 actions.push(TimeoutAction::Complete);
2915 }
2916 CoordinationPhase::Failed => {
2917 actions.push(TimeoutAction::Failed);
2918 }
2919 _ => {}
2920 }
2921 }
2922
2923 let mut expired_validations = Vec::new();
2925 for (addr, validation) in &mut self.active_validations {
2926 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2927 if now >= timeout_at {
2928 validation.retry_count += 1;
2929 if validation.retry_count >= validation.max_retries {
2930 debug!("Path validation failed for {}: max retries exceeded", addr);
2931 expired_validations.push(*addr);
2932 } else {
2933 debug!("Path validation timeout for {}, retrying ({}/{})",
2934 addr, validation.retry_count, validation.max_retries);
2935 validation.sent_at = now;
2936 validation.last_retry_at = Some(now);
2937 actions.push(TimeoutAction::StartValidation);
2938 }
2939 }
2940 }
2941
2942 for addr in expired_validations {
2944 self.active_validations.remove(&addr);
2945 }
2946
2947 if self.resource_manager.should_cleanup(now) {
2949 self.resource_manager.perform_cleanup(now);
2950 }
2951
2952 self.network_monitor.update_quality_score(now);
2954
2955 if self.coordination.is_none() && !self.local_candidates.is_empty() && !self.remote_candidates.is_empty() {
2957 actions.push(TimeoutAction::RetryDiscovery);
2958 }
2959
2960 Ok(actions)
2961 }
2962
2963 pub(super) fn handle_address_observation(
2968 &mut self,
2969 peer_id: [u8; 32],
2970 observed_address: SocketAddr,
2971 connection_id: crate::shared::ConnectionId,
2972 peer_role: NatTraversalRole,
2973 now: Instant,
2974 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
2975 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
2976 let connection_context = ConnectionContext {
2977 connection_id,
2978 original_destination: observed_address, peer_role,
2980 transport_params: None,
2981 };
2982
2983 bootstrap_coordinator.observe_peer_address(
2985 peer_id,
2986 observed_address,
2987 connection_context,
2988 now,
2989 )?;
2990
2991 let sequence = self.next_sequence;
2993 self.next_sequence = VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
2994
2995 let priority = VarInt::from_u32(100); let add_address_frame = bootstrap_coordinator.generate_add_address_frame(
2997 peer_id,
2998 sequence,
2999 priority,
3000 );
3001
3002 Ok(add_address_frame)
3003 } else {
3004 Ok(None)
3006 }
3007 }
3008
3009 pub(super) fn handle_punch_me_now_frame(
3014 &mut self,
3015 from_peer: [u8; 32],
3016 source_addr: SocketAddr,
3017 frame: &crate::frame::PunchMeNow,
3018 now: Instant,
3019 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3020 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3021 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3022 } else {
3023 Ok(None)
3025 }
3026 }
3027
3028 #[allow(dead_code)] pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3033 self.bootstrap_coordinator
3034 .as_ref()
3035 .and_then(|coord| coord.get_peer_record(peer_id))
3036 .map(|record| record.observed_address)
3037 }
3038
3039 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3041 debug!("Starting candidate discovery for NAT traversal");
3042
3043 if self.local_candidates.is_empty() {
3045 debug!("Local candidates will be populated by discovery manager");
3048 }
3049
3050 Ok(())
3051 }
3052
3053 pub(super) fn queue_add_address_frame(
3055 &mut self,
3056 sequence: VarInt,
3057 address: SocketAddr,
3058 priority: u32,
3059 ) -> Result<(), NatTraversalError> {
3060 debug!("Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3061 sequence, address, priority);
3062
3063 let candidate = AddressCandidate {
3065 address,
3066 priority,
3067 source: CandidateSource::Local,
3068 discovered_at: Instant::now(),
3069 state: CandidateState::New,
3070 attempt_count: 0,
3071 last_attempt: None,
3072 };
3073
3074 if !self.local_candidates.values().any(|c| c.address == address) {
3076 self.local_candidates.insert(sequence, candidate);
3077 }
3078
3079 Ok(())
3080 }
3081}
3082
3083#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3085#[allow(dead_code)] pub(crate) enum NatTraversalError {
3087 TooManyCandidates,
3089 DuplicateAddress,
3091 UnknownCandidate,
3093 InvalidCandidateState,
3095 NoActiveValidation,
3097 ChallengeMismatch,
3099 NoActiveCoordination,
3101 SecurityValidationFailed,
3103 RateLimitExceeded,
3105 InvalidAddress,
3107 SuspiciousCoordination,
3109 ResourceLimitExceeded,
3111}
3112
3113impl std::fmt::Display for NatTraversalError {
3114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3115 match self {
3116 Self::TooManyCandidates => write!(f, "too many candidates"),
3117 Self::DuplicateAddress => write!(f, "duplicate address"),
3118 Self::UnknownCandidate => write!(f, "unknown candidate"),
3119 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3120 Self::NoActiveValidation => write!(f, "no active validation"),
3121 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3122 Self::NoActiveCoordination => write!(f, "no active coordination"),
3123 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3124 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3125 Self::InvalidAddress => write!(f, "invalid address"),
3126 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3127 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3128 }
3129 }
3130}
3131
3132impl std::error::Error for NatTraversalError {}
3133
3134#[derive(Debug, Clone)]
3136#[allow(dead_code)] pub(crate) struct SecurityStats {
3138 pub total_security_rejections: u32,
3140 pub rate_limit_violations: u32,
3142 pub invalid_address_rejections: u32,
3144 pub suspicious_coordination_attempts: u32,
3146 pub active_validations: usize,
3148 pub cached_address_validations: usize,
3150 pub current_candidate_rate: usize,
3152 pub current_coordination_rate: usize,
3154}
3155
3156#[derive(Debug)]
3161pub(crate) struct BootstrapCoordinator {
3162 peer_registry: HashMap<PeerId, PeerObservationRecord>,
3164 coordination_sessions: HashMap<CoordinationSessionId, CoordinationSession>,
3166 #[allow(dead_code)] pending_coordination: VecDeque<PendingCoordinationRequest>,
3169 #[allow(dead_code)] address_observations: HashMap<SocketAddr, AddressObservation>,
3172 security_validator: SecurityValidationState,
3174 stats: BootstrapStats,
3176 _config: BootstrapConfig,
3178 _last_cleanup: Option<Instant>,
3180}
3181
3182type CoordinationSessionId = u64;
3184
3185type PeerId = [u8; 32];
3187
3188#[derive(Debug, Clone)]
3190pub(crate) struct PeerObservationRecord {
3191 #[allow(dead_code)] peer_id: PeerId,
3194 observed_address: SocketAddr,
3196 #[allow(dead_code)] observed_at: Instant,
3199 #[allow(dead_code)] connection_context: ConnectionContext,
3202 #[allow(dead_code)] can_coordinate: bool,
3205 #[allow(dead_code)] coordination_count: u32,
3208 #[allow(dead_code)] success_rate: f64,
3211}
3212
3213#[derive(Debug, Clone)]
3215pub(crate) struct ConnectionContext {
3216 #[allow(dead_code)] connection_id: ConnectionId,
3219 #[allow(dead_code)] original_destination: SocketAddr,
3222 #[allow(dead_code)] peer_role: NatTraversalRole,
3225 #[allow(dead_code)] transport_params: Option<NatTraversalTransportParams>,
3228}
3229
3230#[derive(Debug, Clone)]
3232struct NatTraversalTransportParams {
3233 #[allow(dead_code)] max_candidates: u32,
3236 #[allow(dead_code)] coordination_timeout: Duration,
3239 #[allow(dead_code)] supports_advanced_features: bool,
3242}
3243
3244#[derive(Debug, Clone)]
3246struct AddressObservation {
3247 #[allow(dead_code)] address: SocketAddr,
3250 #[allow(dead_code)] first_observed: Instant,
3253 #[allow(dead_code)] observation_count: u32,
3256 #[allow(dead_code)] validation_state: AddressValidationResult,
3259 #[allow(dead_code)] associated_peers: Vec<PeerId>,
3262}
3263
3264#[derive(Debug, Clone)]
3266#[allow(dead_code)] pub(crate) struct CoordinationSession {
3268 session_id: CoordinationSessionId,
3270 peer_a: PeerId,
3272 peer_b: PeerId,
3274 current_round: VarInt,
3276 started_at: Instant,
3278 phase: CoordinationPhase,
3280 target_addresses: Vec<(SocketAddr, VarInt)>, sync_state: SynchronizationState,
3284 stats: CoordinationSessionStats,
3286}
3287
3288#[derive(Debug, Clone)]
3290struct SynchronizationState {
3291 peer_a_ready: bool,
3293 peer_b_ready: bool,
3295}
3296
3297#[derive(Debug, Clone, Default)]
3299struct CoordinationSessionStats {
3300 successful_coordinations: u32,
3302}
3303
3304#[derive(Debug, Clone)]
3306struct PendingCoordinationRequest {
3307 _unused: (),
3308}
3309
3310#[derive(Debug, Clone)]
3312pub(crate) struct BootstrapConfig {
3313 _unused: (),
3314}
3315
3316#[derive(Debug, Clone, Default)]
3318pub(crate) struct BootstrapStats {
3319 #[allow(dead_code)] total_observations: u64,
3322 total_coordinations: u64,
3324 successful_coordinations: u64,
3326 #[allow(dead_code)] active_peers: usize,
3329 active_sessions: usize,
3331 security_rejections: u64,
3333}
3334
3335#[derive(Debug, Clone)]
3337#[allow(dead_code)] pub(crate) enum CoordinationSessionEvent {
3339 PhaseChanged {
3341 session_id: CoordinationSessionId,
3342 old_phase: CoordinationPhase,
3343 new_phase: CoordinationPhase,
3344 },
3345 SessionFailed {
3347 session_id: CoordinationSessionId,
3348 peer_a: PeerId,
3349 peer_b: PeerId,
3350 reason: String,
3351 },
3352 StartHolePunching {
3354 session_id: CoordinationSessionId,
3355 peer_a: PeerId,
3356 peer_b: PeerId,
3357 target_addresses: Vec<(SocketAddr, VarInt)>,
3358 },
3359 ReadyForCleanup {
3361 session_id: CoordinationSessionId,
3362 },
3363}
3364
3365#[derive(Debug, Clone, Copy)]
3367#[allow(dead_code)] enum SessionAdvancementEvent {
3369 BothPeersReady,
3371 CoordinationComplete,
3373 PreparationComplete,
3375 PunchingComplete,
3377 ValidationTimeout,
3379 Timeout,
3381 ReadyForCleanup,
3383}
3384
3385#[derive(Debug, Clone, Copy)]
3387#[allow(dead_code)] pub(crate) enum CoordinationRecoveryAction {
3389 NoAction,
3391 RetryWithBackoff,
3393 MarkAsFailed,
3395 Cleanup,
3397}
3398
3399impl BootstrapCoordinator {
3400 pub(crate) fn new(config: BootstrapConfig) -> Self {
3402 Self {
3403 peer_registry: HashMap::new(),
3404 coordination_sessions: HashMap::new(),
3405 pending_coordination: VecDeque::new(),
3406 address_observations: HashMap::new(),
3407 security_validator: SecurityValidationState::new(),
3408 stats: BootstrapStats::default(),
3409 _config: config,
3410 _last_cleanup: None,
3411 }
3412 }
3413
3414 pub(crate) fn observe_peer_address(
3419 &mut self,
3420 peer_id: PeerId,
3421 observed_address: SocketAddr,
3422 connection_context: ConnectionContext,
3423 now: Instant,
3424 ) -> Result<(), NatTraversalError> {
3425 match self.security_validator.validate_address(observed_address, now) {
3427 AddressValidationResult::Valid => {},
3428 AddressValidationResult::Invalid => {
3429 self.stats.security_rejections += 1;
3430 return Err(NatTraversalError::InvalidAddress);
3431 }
3432 AddressValidationResult::Suspicious => {
3433 self.stats.security_rejections += 1;
3434 return Err(NatTraversalError::SecurityValidationFailed);
3435 }
3436 }
3437
3438 if self.security_validator.is_candidate_rate_limited(now) {
3440 self.stats.security_rejections += 1;
3441 return Err(NatTraversalError::RateLimitExceeded);
3442 }
3443
3444 let observation = self.address_observations.entry(observed_address)
3446 .or_insert_with(|| AddressObservation {
3447 address: observed_address,
3448 first_observed: now,
3449 observation_count: 0,
3450 validation_state: AddressValidationResult::Valid,
3451 associated_peers: Vec::new(),
3452 });
3453
3454 observation.observation_count += 1;
3455 if !observation.associated_peers.contains(&peer_id) {
3456 observation.associated_peers.push(peer_id);
3457 }
3458
3459 let peer_record = PeerObservationRecord {
3461 peer_id,
3462 observed_address,
3463 observed_at: now,
3464 connection_context,
3465 can_coordinate: true, coordination_count: 0,
3467 success_rate: 1.0,
3468 };
3469
3470 self.peer_registry.insert(peer_id, peer_record);
3471 self.stats.total_observations += 1;
3472 self.stats.active_peers = self.peer_registry.len();
3473
3474 debug!("Observed peer {:?} at address {} (total observations: {})",
3475 peer_id, observed_address, self.stats.total_observations);
3476
3477 Ok(())
3478 }
3479
3480 pub(crate) fn generate_add_address_frame(
3485 &self,
3486 peer_id: PeerId,
3487 sequence: VarInt,
3488 priority: VarInt,
3489 ) -> Option<crate::frame::AddAddress> {
3490 if let Some(peer_record) = self.peer_registry.get(&peer_id) {
3491 Some(crate::frame::AddAddress {
3492 sequence,
3493 address: peer_record.observed_address,
3494 priority,
3495 })
3496 } else {
3497 None
3498 }
3499 }
3500
3501 pub(crate) fn process_punch_me_now_frame(
3506 &mut self,
3507 from_peer: PeerId,
3508 source_addr: SocketAddr,
3509 frame: &crate::frame::PunchMeNow,
3510 now: Instant,
3511 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3512 if self.security_validator.is_adaptive_rate_limited(from_peer, now) {
3514 self.stats.security_rejections += 1;
3515 debug!("PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3516 hex::encode(&from_peer[..8]));
3517 return Err(NatTraversalError::RateLimitExceeded);
3518 }
3519
3520 self.security_validator.enhanced_address_validation(frame.local_address, source_addr, now)
3522 .map_err(|e| {
3523 self.stats.security_rejections += 1;
3524 debug!("PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3525 hex::encode(&from_peer[..8]), e);
3526 e
3527 })?;
3528
3529 self.security_validator.validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3531 .map_err(|e| {
3532 self.stats.security_rejections += 1;
3533 debug!("PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3534 hex::encode(&from_peer[..8]), e);
3535 e
3536 })?;
3537
3538 if let Some(target_peer_id) = frame.target_peer_id {
3540 if let Some(target_peer) = self.peer_registry.get(&target_peer_id) {
3542 let session_id = self.generate_session_id();
3544
3545 if !self.coordination_sessions.contains_key(&session_id) {
3546 let _network_rtt = self.estimate_peer_rtt(&from_peer)
3548 .unwrap_or(Duration::from_millis(100));
3549
3550 let session = CoordinationSession {
3551 session_id,
3552 peer_a: from_peer,
3553 peer_b: target_peer_id,
3554 current_round: frame.round,
3555 started_at: now,
3556 phase: CoordinationPhase::Requesting,
3557 target_addresses: vec![(frame.local_address, frame.target_sequence)],
3558 sync_state: SynchronizationState {
3559 peer_a_ready: true, peer_b_ready: false,
3561 },
3562 stats: CoordinationSessionStats::default(),
3563 };
3564
3565 self.coordination_sessions.insert(session_id, session);
3566 self.stats.total_coordinations += 1;
3567 self.stats.active_sessions = self.coordination_sessions.len();
3568 }
3569
3570 let coordination_frame = crate::frame::PunchMeNow {
3572 round: frame.round,
3573 target_sequence: frame.target_sequence,
3574 local_address: target_peer.observed_address,
3575 target_peer_id: Some(from_peer),
3576 };
3577
3578 info!("Coordinating hole punch between {:?} and {:?} (round: {})",
3579 from_peer, target_peer_id, frame.round);
3580
3581 Ok(Some(coordination_frame))
3582 } else {
3583 warn!("Target peer {:?} not found for coordination from {:?}",
3585 target_peer_id, from_peer);
3586 Ok(None)
3587 }
3588 } else {
3589 let session_id = if let Some(session) = self.find_coordination_session_by_peer(from_peer, frame.round) {
3591 session.sync_state.peer_b_ready = true;
3592
3593 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3595 session.phase = CoordinationPhase::Punching;
3596 session.stats.successful_coordinations += 1;
3597 Some(session.session_id)
3598 } else {
3599 None
3600 }
3601 } else {
3602 None
3603 };
3604
3605 if let Some(session_id) = session_id {
3607 self.stats.successful_coordinations += 1;
3608 info!("Coordination complete for session {} (round: {})",
3609 session_id, frame.round);
3610 }
3611
3612 Ok(None)
3613 }
3614 }
3615
3616 fn find_coordination_session_by_peer(
3618 &mut self,
3619 peer_id: PeerId,
3620 round: VarInt,
3621 ) -> Option<&mut CoordinationSession> {
3622 self.coordination_sessions.values_mut().find(|session| {
3623 (session.peer_a == peer_id || session.peer_b == peer_id) &&
3624 session.current_round == round
3625 })
3626 }
3627
3628 fn generate_session_id(&self) -> CoordinationSessionId {
3630 rand::random()
3631 }
3632
3633 pub(crate) fn generate_secure_coordination_round(&self) -> VarInt {
3635 self.security_validator.generate_secure_coordination_round()
3636 }
3637
3638 pub(crate) fn validate_coordination_security(
3640 &mut self,
3641 peer_id: PeerId,
3642 source_addr: SocketAddr,
3643 target_addr: SocketAddr,
3644 now: Instant,
3645 ) -> Result<(), NatTraversalError> {
3646 if self.security_validator.is_adaptive_rate_limited(peer_id, now) {
3648 self.stats.security_rejections += 1;
3649 return Err(NatTraversalError::RateLimitExceeded);
3650 }
3651
3652 self.security_validator.enhanced_address_validation(target_addr, source_addr, now)?;
3654
3655 self.security_validator.validate_amplification_limits(source_addr, target_addr, now)?;
3657
3658 Ok(())
3659 }
3660
3661 pub(crate) fn cleanup_expired_sessions(&mut self, now: Instant) {
3663 let session_timeout = Duration::from_secs(300); let expired_sessions: Vec<CoordinationSessionId> = self.coordination_sessions
3667 .iter()
3668 .filter(|(_, session)| {
3669 now.duration_since(session.started_at) > session_timeout
3670 })
3671 .map(|(&session_id, _)| session_id)
3672 .collect();
3673
3674 for session_id in expired_sessions {
3676 if let Some(session) = self.coordination_sessions.remove(&session_id) {
3677 debug!("Cleaned up expired coordination session {} between {:?} and {:?}",
3678 session_id, hex::encode(&session.peer_a[..8]), hex::encode(&session.peer_b[..8]));
3679 }
3680 }
3681
3682 self.stats.active_sessions = self.coordination_sessions.len();
3684
3685 let observation_timeout = Duration::from_secs(3600); self.peer_registry.retain(|_, record| {
3688 now.duration_since(record.observed_at) <= observation_timeout
3689 });
3690
3691 self.stats.active_peers = self.peer_registry.len();
3693
3694 self.address_observations.retain(|_, observation| {
3696 now.duration_since(observation.first_observed) <= observation_timeout
3697 });
3698 }
3699
3700 pub(crate) fn get_stats(&self) -> &BootstrapStats {
3702 &self.stats
3703 }
3704
3705 pub(crate) fn update_peer_coordination_stats(
3707 &mut self,
3708 peer_id: PeerId,
3709 success: bool,
3710 ) {
3711 if let Some(peer_record) = self.peer_registry.get_mut(&peer_id) {
3712 peer_record.coordination_count += 1;
3713
3714 if success {
3715 let alpha = 0.1; peer_record.success_rate = peer_record.success_rate * (1.0 - alpha) + alpha;
3718 } else {
3719 let alpha = 0.1;
3721 peer_record.success_rate = peer_record.success_rate * (1.0 - alpha);
3722 }
3723
3724 if peer_record.success_rate < 0.1 && peer_record.coordination_count > 10 {
3726 peer_record.can_coordinate = false;
3727 warn!("Disabled coordination for peer {:?} due to low success rate: {:.2}",
3728 hex::encode(&peer_id[..8]), peer_record.success_rate);
3729 }
3730 }
3731 }
3732
3733 pub(crate) fn poll_session_state_machine(&mut self, now: Instant) -> Vec<CoordinationSessionEvent> {
3738 let mut events = Vec::new();
3739 let mut sessions_to_update = Vec::new();
3740
3741 for (&session_id, session) in &self.coordination_sessions {
3743 if let Some(event) = self.should_advance_session(session, now) {
3744 sessions_to_update.push((session_id, event));
3745 }
3746 }
3747
3748 for (session_id, event) in sessions_to_update {
3750 let session_events = if let Some(session) = self.coordination_sessions.get_mut(&session_id) {
3751 let peer_a = session.peer_a;
3752 let peer_b = session.peer_b;
3753
3754 match Self::advance_session_state_static(session, event, now) {
3755 Ok(session_events) => session_events,
3756 Err(e) => {
3757 warn!("Failed to advance session {} state: {:?}", session_id, e);
3758 session.phase = CoordinationPhase::Failed;
3760 vec![CoordinationSessionEvent::SessionFailed {
3761 session_id,
3762 peer_a,
3763 peer_b,
3764 reason: format!("State advancement error: {:?}", e),
3765 }]
3766 }
3767 }
3768 } else {
3769 Vec::new()
3770 };
3771
3772 events.extend(session_events);
3773 }
3774
3775 self.cleanup_completed_sessions(now);
3777
3778 events
3779 }
3780
3781 fn should_advance_session(&self, session: &CoordinationSession, now: Instant) -> Option<SessionAdvancementEvent> {
3783 let session_age = now.duration_since(session.started_at);
3784
3785 match session.phase {
3786 CoordinationPhase::Requesting => {
3787 if session_age > Duration::from_secs(10) {
3789 Some(SessionAdvancementEvent::Timeout)
3790 } else if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3791 Some(SessionAdvancementEvent::BothPeersReady)
3792 } else {
3793 None
3794 }
3795 }
3796 CoordinationPhase::Coordinating => {
3797 if session_age > Duration::from_millis(500) {
3799 Some(SessionAdvancementEvent::CoordinationComplete)
3800 } else {
3801 None
3802 }
3803 }
3804 CoordinationPhase::Preparing => {
3805 if session_age > Duration::from_secs(1) {
3807 Some(SessionAdvancementEvent::PreparationComplete)
3808 } else {
3809 None
3810 }
3811 }
3812 CoordinationPhase::Punching => {
3813 if session_age > Duration::from_secs(2) {
3815 Some(SessionAdvancementEvent::PunchingComplete)
3816 } else {
3817 None
3818 }
3819 }
3820 CoordinationPhase::Validating => {
3821 if session_age > Duration::from_secs(10) {
3823 Some(SessionAdvancementEvent::ValidationTimeout)
3824 } else {
3825 None
3826 }
3827 }
3828 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
3829 if session_age > Duration::from_secs(60) {
3831 Some(SessionAdvancementEvent::ReadyForCleanup)
3832 } else {
3833 None
3834 }
3835 }
3836 CoordinationPhase::Idle => {
3837 Some(SessionAdvancementEvent::Timeout)
3839 }
3840 }
3841 }
3842
3843 fn advance_session_state_static(
3845 session: &mut CoordinationSession,
3846 event: SessionAdvancementEvent,
3847 _now: Instant,
3848 ) -> Result<Vec<CoordinationSessionEvent>, NatTraversalError> {
3849 let mut events = Vec::new();
3850 let previous_phase = session.phase;
3851
3852 match (session.phase, event) {
3853 (CoordinationPhase::Requesting, SessionAdvancementEvent::BothPeersReady) => {
3854 session.phase = CoordinationPhase::Coordinating;
3855 debug!("Session {} advanced from Requesting to Coordinating", session.session_id);
3856 events.push(CoordinationSessionEvent::PhaseChanged {
3857 session_id: session.session_id,
3858 old_phase: previous_phase,
3859 new_phase: session.phase,
3860 });
3861 }
3862 (CoordinationPhase::Requesting, SessionAdvancementEvent::Timeout) => {
3863 session.phase = CoordinationPhase::Failed;
3864 warn!("Session {} timed out in Requesting phase", session.session_id);
3865 events.push(CoordinationSessionEvent::SessionFailed {
3866 session_id: session.session_id,
3867 peer_a: session.peer_a,
3868 peer_b: session.peer_b,
3869 reason: "Timeout waiting for peer responses".to_string(),
3870 });
3871 }
3872 (CoordinationPhase::Coordinating, SessionAdvancementEvent::CoordinationComplete) => {
3873 session.phase = CoordinationPhase::Preparing;
3874 debug!("Session {} advanced from Coordinating to Preparing", session.session_id);
3875 events.push(CoordinationSessionEvent::PhaseChanged {
3876 session_id: session.session_id,
3877 old_phase: previous_phase,
3878 new_phase: session.phase,
3879 });
3880 }
3881 (CoordinationPhase::Preparing, SessionAdvancementEvent::PreparationComplete) => {
3882 session.phase = CoordinationPhase::Punching;
3883 debug!("Session {} advanced from Preparing to Punching", session.session_id);
3884 events.push(CoordinationSessionEvent::PhaseChanged {
3885 session_id: session.session_id,
3886 old_phase: previous_phase,
3887 new_phase: session.phase,
3888 });
3889 events.push(CoordinationSessionEvent::StartHolePunching {
3890 session_id: session.session_id,
3891 peer_a: session.peer_a,
3892 peer_b: session.peer_b,
3893 target_addresses: session.target_addresses.clone(),
3894 });
3895 }
3896 (CoordinationPhase::Punching, SessionAdvancementEvent::PunchingComplete) => {
3897 session.phase = CoordinationPhase::Validating;
3898 debug!("Session {} advanced from Punching to Validating", session.session_id);
3899 events.push(CoordinationSessionEvent::PhaseChanged {
3900 session_id: session.session_id,
3901 old_phase: previous_phase,
3902 new_phase: session.phase,
3903 });
3904 }
3905 (CoordinationPhase::Validating, SessionAdvancementEvent::ValidationTimeout) => {
3906 session.phase = CoordinationPhase::Failed;
3907 warn!("Session {} validation timed out", session.session_id);
3908 events.push(CoordinationSessionEvent::SessionFailed {
3909 session_id: session.session_id,
3910 peer_a: session.peer_a,
3911 peer_b: session.peer_b,
3912 reason: "Validation timeout".to_string(),
3913 });
3914 }
3915 (phase, SessionAdvancementEvent::ReadyForCleanup) => {
3916 debug!("Session {} ready for cleanup in phase {:?}", session.session_id, phase);
3917 events.push(CoordinationSessionEvent::ReadyForCleanup {
3918 session_id: session.session_id,
3919 });
3920 }
3921 _ => {
3922 warn!("Invalid state transition for session {}: {:?} -> {:?}",
3924 session.session_id, session.phase, event);
3925 }
3926 }
3927
3928 Ok(events)
3929 }
3930
3931 fn cleanup_completed_sessions(&mut self, now: Instant) {
3933 let cleanup_timeout = Duration::from_secs(300); let sessions_to_remove: Vec<CoordinationSessionId> = self.coordination_sessions
3936 .iter()
3937 .filter(|(_, session)| {
3938 matches!(session.phase, CoordinationPhase::Succeeded | CoordinationPhase::Failed) &&
3939 now.duration_since(session.started_at) > cleanup_timeout
3940 })
3941 .map(|(&session_id, _)| session_id)
3942 .collect();
3943
3944 for session_id in sessions_to_remove {
3945 if let Some(session) = self.coordination_sessions.remove(&session_id) {
3946 debug!("Cleaned up completed session {} in phase {:?}",
3947 session_id, session.phase);
3948 }
3949 }
3950
3951 self.stats.active_sessions = self.coordination_sessions.len();
3952 }
3953
3954 pub(crate) fn retry_failed_coordination(
3959 &mut self,
3960 session_id: CoordinationSessionId,
3961 now: Instant,
3962 ) -> Result<bool, NatTraversalError> {
3963 let session = self.coordination_sessions.get_mut(&session_id)
3964 .ok_or(NatTraversalError::NoActiveCoordination)?;
3965
3966 if !matches!(session.phase, CoordinationPhase::Failed) {
3968 return Ok(false);
3969 }
3970
3971 let base_delay = Duration::from_secs(1);
3973 let max_delay = Duration::from_secs(60);
3974 let retry_count = session.stats.successful_coordinations; let delay = std::cmp::min(
3977 base_delay * 2_u32.pow(retry_count.min(10)), max_delay
3979 );
3980
3981 let _jitter_factor = 0.1;
3983 let jitter = Duration::from_millis((rand::random::<u64>() % 100) * delay.as_millis() as u64 / 1000);
3984 let total_delay = delay + jitter;
3985
3986 if now.duration_since(session.started_at) < total_delay {
3988 return Ok(false);
3989 }
3990
3991 const MAX_RETRIES: u32 = 5;
3993 if retry_count >= MAX_RETRIES {
3994 warn!("Session {} exceeded maximum retry attempts ({})", session_id, MAX_RETRIES);
3995 return Ok(false);
3996 }
3997
3998 session.phase = CoordinationPhase::Requesting;
4000 session.started_at = now;
4001 session.sync_state.peer_a_ready = false;
4002 session.sync_state.peer_b_ready = false;
4003 session.stats.successful_coordinations += 1; info!("Retrying coordination session {} (attempt {})", session_id, retry_count + 1);
4006 Ok(true)
4007 }
4008
4009 pub(crate) fn handle_coordination_error(
4011 &mut self,
4012 session_id: CoordinationSessionId,
4013 error: NatTraversalError,
4014 _now: Instant,
4015 ) -> CoordinationRecoveryAction {
4016 let session = match self.coordination_sessions.get_mut(&session_id) {
4017 Some(session) => session,
4018 None => return CoordinationRecoveryAction::NoAction,
4019 };
4020
4021 match error {
4022 NatTraversalError::RateLimitExceeded => {
4023 warn!("Rate limit exceeded for session {}, will retry", session_id);
4025 CoordinationRecoveryAction::RetryWithBackoff
4026 }
4027 NatTraversalError::SecurityValidationFailed | NatTraversalError::SuspiciousCoordination => {
4028 session.phase = CoordinationPhase::Failed;
4030 warn!("Security validation failed for session {}, marking as failed", session_id);
4031 CoordinationRecoveryAction::MarkAsFailed
4032 }
4033 NatTraversalError::InvalidAddress => {
4034 warn!("Invalid address in session {}, allowing retry", session_id);
4036 CoordinationRecoveryAction::RetryWithBackoff
4037 }
4038 NatTraversalError::NoActiveCoordination => {
4039 warn!("No active coordination for session {}, cleaning up", session_id);
4041 CoordinationRecoveryAction::Cleanup
4042 }
4043 _ => {
4044 warn!("Coordination error for session {}: {:?}, will retry", session_id, error);
4046 CoordinationRecoveryAction::RetryWithBackoff
4047 }
4048 }
4049 }
4050
4051 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
4053 if let Some(_peer_record) = self.peer_registry.get(peer_id) {
4056 Some(Duration::from_millis(100))
4058 } else {
4059 None
4060 }
4061 }
4062
4063
4064 pub(crate) fn coordinate_hole_punching(
4069 &mut self,
4070 peer_a: PeerId,
4071 peer_b: PeerId,
4072 round: VarInt,
4073 now: Instant,
4074 ) -> Result<CoordinationSessionId, NatTraversalError> {
4075 let peer_a_record = self.peer_registry.get(&peer_a)
4077 .ok_or(NatTraversalError::UnknownCandidate)?;
4078 let peer_b_record = self.peer_registry.get(&peer_b)
4079 .ok_or(NatTraversalError::UnknownCandidate)?;
4080
4081 if !peer_a_record.can_coordinate || !peer_b_record.can_coordinate {
4082 return Err(NatTraversalError::InvalidCandidateState);
4083 }
4084
4085 let session_id = self.generate_session_id();
4087
4088 let session = CoordinationSession {
4090 session_id,
4091 peer_a,
4092 peer_b,
4093 current_round: round,
4094 started_at: now,
4095 phase: CoordinationPhase::Requesting,
4096 target_addresses: vec![
4097 (peer_a_record.observed_address, VarInt::from_u32(0)),
4098 (peer_b_record.observed_address, VarInt::from_u32(1)),
4099 ],
4100 sync_state: SynchronizationState {
4101 peer_a_ready: false,
4102 peer_b_ready: false,
4103 },
4104 stats: CoordinationSessionStats::default(),
4105 };
4106
4107 self.coordination_sessions.insert(session_id, session);
4108 self.stats.total_coordinations += 1;
4109 self.stats.active_sessions = self.coordination_sessions.len();
4110
4111 info!("Started coordination session {} between peers {:?} and {:?} (round: {})",
4112 session_id, hex::encode(&peer_a[..8]), hex::encode(&peer_b[..8]), round);
4113
4114 Ok(session_id)
4115 }
4116
4117 pub(crate) fn relay_coordination_frame(
4122 &mut self,
4123 session_id: CoordinationSessionId,
4124 from_peer: PeerId,
4125 frame: &crate::frame::PunchMeNow,
4126 _now: Instant,
4127 ) -> Result<Option<(PeerId, crate::frame::PunchMeNow)>, NatTraversalError> {
4128 let session = self.coordination_sessions.get_mut(&session_id)
4129 .ok_or(NatTraversalError::NoActiveCoordination)?;
4130
4131 if session.peer_a != from_peer && session.peer_b != from_peer {
4133 return Err(NatTraversalError::SuspiciousCoordination);
4134 }
4135
4136 let target_peer = if session.peer_a == from_peer {
4138 session.peer_b
4139 } else {
4140 session.peer_a
4141 };
4142
4143 let target_record = self.peer_registry.get(&target_peer)
4145 .ok_or(NatTraversalError::UnknownCandidate)?;
4146
4147 if session.peer_a == from_peer {
4149 session.sync_state.peer_a_ready = true;
4150 } else {
4151 session.sync_state.peer_b_ready = true;
4152 }
4153
4154 let relay_frame = crate::frame::PunchMeNow {
4156 round: frame.round,
4157 target_sequence: frame.target_sequence,
4158 local_address: target_record.observed_address,
4159 target_peer_id: Some(from_peer),
4160 };
4161
4162 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4164 session.phase = CoordinationPhase::Coordinating;
4165 info!("Coordination phase complete for session {} - both peers ready", session_id);
4166 }
4167
4168 debug!("Relaying coordination frame from {:?} to {:?} in session {}",
4169 hex::encode(&from_peer[..8]), hex::encode(&target_peer[..8]), session_id);
4170
4171 Ok(Some((target_peer, relay_frame)))
4172 }
4173
4174 pub(crate) fn advance_coordination_round(
4179 &mut self,
4180 session_id: CoordinationSessionId,
4181 now: Instant,
4182 ) -> Result<CoordinationPhase, NatTraversalError> {
4183 let session = self.coordination_sessions.get_mut(&session_id)
4184 .ok_or(NatTraversalError::NoActiveCoordination)?;
4185
4186 let previous_phase = session.phase;
4187
4188 match session.phase {
4190 CoordinationPhase::Requesting => {
4191 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4193 session.phase = CoordinationPhase::Coordinating;
4194 debug!("Session {} advanced to Coordinating phase", session_id);
4195 }
4196 }
4197 CoordinationPhase::Coordinating => {
4198 let coordination_delay = Duration::from_millis(200); let punch_time = now + coordination_delay;
4201
4202 session.phase = CoordinationPhase::Preparing;
4203 debug!("Session {} advanced to Preparing phase, punch time: {:?}",
4204 session_id, punch_time);
4205 }
4206 CoordinationPhase::Preparing => {
4207 session.phase = CoordinationPhase::Punching;
4209 debug!("Session {} advanced to Punching phase", session_id);
4210 }
4211 CoordinationPhase::Punching => {
4212 session.phase = CoordinationPhase::Validating;
4214 debug!("Session {} advanced to Validating phase", session_id);
4215 }
4216 CoordinationPhase::Validating => {
4217 let validation_timeout = Duration::from_secs(5);
4219 if now.duration_since(session.started_at) > validation_timeout {
4220 session.phase = CoordinationPhase::Failed;
4221 debug!("Session {} timed out in validation", session_id);
4222 }
4223 }
4224 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4225 }
4227 CoordinationPhase::Idle => {
4228 session.phase = CoordinationPhase::Requesting;
4230 }
4231 }
4232
4233 if session.phase != previous_phase {
4235 match session.phase {
4236 CoordinationPhase::Succeeded => {
4237 session.stats.successful_coordinations += 1;
4238 self.stats.successful_coordinations += 1;
4239 }
4240 CoordinationPhase::Failed => {
4241 }
4243 _ => {}
4244 }
4245 }
4246
4247 Ok(session.phase)
4248 }
4249
4250 pub(crate) fn get_coordination_session(&self, session_id: CoordinationSessionId) -> Option<&CoordinationSession> {
4252 self.coordination_sessions.get(&session_id)
4253 }
4254
4255 pub(crate) fn get_coordination_session_mut(&mut self, session_id: CoordinationSessionId) -> Option<&mut CoordinationSession> {
4257 self.coordination_sessions.get_mut(&session_id)
4258 }
4259
4260 pub(crate) fn mark_coordination_success(
4262 &mut self,
4263 session_id: CoordinationSessionId,
4264 _now: Instant,
4265 ) -> Result<(), NatTraversalError> {
4266 let session = self.coordination_sessions.get_mut(&session_id)
4267 .ok_or(NatTraversalError::NoActiveCoordination)?;
4268
4269 session.phase = CoordinationPhase::Succeeded;
4270 session.stats.successful_coordinations += 1;
4271 self.stats.successful_coordinations += 1;
4272
4273 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4275 peer_a_record.coordination_count += 1;
4276 peer_a_record.success_rate = (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64 + 1.0) / peer_a_record.coordination_count as f64;
4277 }
4278
4279 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4280 peer_b_record.coordination_count += 1;
4281 peer_b_record.success_rate = (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64 + 1.0) / peer_b_record.coordination_count as f64;
4282 }
4283
4284 info!("Coordination session {} marked as successful", session_id);
4285 Ok(())
4286 }
4287
4288 pub(crate) fn mark_coordination_failure(
4290 &mut self,
4291 session_id: CoordinationSessionId,
4292 reason: &str,
4293 _now: Instant,
4294 ) -> Result<(), NatTraversalError> {
4295 let session = self.coordination_sessions.get_mut(&session_id)
4296 .ok_or(NatTraversalError::NoActiveCoordination)?;
4297
4298 session.phase = CoordinationPhase::Failed;
4299
4300 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4302 peer_a_record.coordination_count += 1;
4303 peer_a_record.success_rate = (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64) / peer_a_record.coordination_count as f64;
4304 }
4305
4306 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4307 peer_b_record.coordination_count += 1;
4308 peer_b_record.success_rate = (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64) / peer_b_record.coordination_count as f64;
4309 }
4310
4311 warn!("Coordination session {} failed: {}", session_id, reason);
4312 Ok(())
4313 }
4314
4315 pub(crate) fn get_peer_record(&self, peer_id: PeerId) -> Option<&PeerObservationRecord> {
4317 self.peer_registry.get(&peer_id)
4318 }
4319
4320}
4321
4322impl Default for BootstrapConfig {
4323 fn default() -> Self {
4324 Self {
4325 _unused: (),
4326 }
4327 }
4328}
4329
4330#[derive(Debug)]
4336#[allow(dead_code)] pub(super) struct MultiDestinationTransmitter {
4338 active_targets: Vec<MultiDestPunchTarget>,
4340 stats: MultiDestTransmissionStats,
4342 max_targets: usize,
4344 rate_limiter: TransmissionRateLimiter,
4346 target_selector: AdaptiveTargetSelector,
4348 performance_monitor: TransmissionPerformanceMonitor,
4350}
4351
4352
4353#[derive(Debug, Default, Clone)]
4355pub(super) struct MultiDestTransmissionStats {
4356 _unused: (),
4357}
4358
4359#[derive(Debug)]
4361struct TransmissionRateLimiter {
4362 _unused: (),
4363}
4364
4365
4366#[derive(Debug)]
4368struct AdaptiveTargetSelector {
4369 _unused: (),
4370}
4371
4372
4373#[derive(Debug)]
4375struct TransmissionPerformanceMonitor {
4376 _unused: (),
4377}
4378
4379impl MultiDestinationTransmitter {
4380 pub(super) fn new() -> Self {
4382 Self {
4383 active_targets: Vec::new(),
4384 stats: MultiDestTransmissionStats::default(),
4385 max_targets: 8, rate_limiter: TransmissionRateLimiter::new(100, 50), target_selector: AdaptiveTargetSelector::new(),
4388 performance_monitor: TransmissionPerformanceMonitor::new(),
4389 }
4390 }
4391
4392
4393
4394
4395
4396
4397
4398
4399}
4400
4401
4402
4403impl TransmissionRateLimiter {
4404 fn new(_max_pps: u64, _burst_size: u64) -> Self {
4405 Self {
4406 _unused: (),
4407 }
4408 }
4409
4410}
4411
4412
4413impl AdaptiveTargetSelector {
4414 fn new() -> Self {
4415 Self {
4416 _unused: (),
4417 }
4418 }
4419
4420
4421}
4422
4423impl TransmissionPerformanceMonitor {
4424 fn new() -> Self {
4425 Self {
4426 _unused: (),
4427 }
4428 }
4429
4430}
4431
4432