1use std::{
2 collections::{HashMap, VecDeque},
3 net::{IpAddr, SocketAddr, Ipv4Addr, Ipv6Addr},
4 time::Duration,
5};
6
7use tracing::{trace, debug, warn, info};
8use crate::shared::ConnectionId;
9
10use crate::{
11 Instant, VarInt,
12};
13
14#[derive(Debug)]
19pub(super) struct NatTraversalState {
20 pub(super) role: NatTraversalRole,
22 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
24 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
26 pub(super) candidate_pairs: Vec<CandidatePair>,
28 pub(super) pair_index: HashMap<SocketAddr, usize>,
30 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
32 pub(super) coordination: Option<CoordinationState>,
34 pub(super) next_sequence: VarInt,
36 pub(super) max_candidates: u32,
38 pub(super) coordination_timeout: Duration,
40 pub(super) stats: NatTraversalStats,
42 pub(super) security_state: SecurityValidationState,
44 pub(super) network_monitor: NetworkConditionMonitor,
46 pub(super) resource_manager: ResourceCleanupCoordinator,
48 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
50 #[allow(dead_code)] pub(super) multi_dest_transmitter: MultiDestinationTransmitter,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum NatTraversalRole {
58 Client,
60 Server { can_relay: bool },
62 Bootstrap,
64}
65
66#[derive(Debug, Clone)]
68pub(super) struct AddressCandidate {
69 pub(super) address: SocketAddr,
71 pub(super) priority: u32,
73 pub(super) source: CandidateSource,
75 pub(super) discovered_at: Instant,
77 pub(super) state: CandidateState,
79 pub(super) attempt_count: u32,
81 pub(super) last_attempt: Option<Instant>,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum CandidateSource {
88 Local,
90 Observed { by_node: Option<VarInt> },
92 Peer,
94 Predicted,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum CandidateState {
101 New,
103 Validating,
105 Valid,
107 Failed,
109 Removed,
111}
112
113#[derive(Debug)]
115pub(super) struct PathValidationState {
116 pub(super) challenge: u64,
118 pub(super) sent_at: Instant,
120 pub(super) retry_count: u32,
122 pub(super) max_retries: u32,
124 #[allow(dead_code)] pub(super) coordination_round: Option<VarInt>,
127 pub(super) timeout_state: AdaptiveTimeoutState,
129 pub(super) last_retry_at: Option<Instant>,
131}
132
133#[derive(Debug)]
135pub(super) struct CoordinationState {
136 pub(super) round: VarInt,
138 pub(super) punch_targets: Vec<PunchTarget>,
140 pub(super) round_start: Instant,
142 pub(super) punch_start: Instant,
144 #[allow(dead_code)] pub(super) round_duration: Duration,
147 pub(super) state: CoordinationPhase,
149 pub(super) punch_request_sent: bool,
151 pub(super) peer_punch_received: bool,
153 pub(super) retry_count: u32,
155 pub(super) max_retries: u32,
157 pub(super) timeout_state: AdaptiveTimeoutState,
159 pub(super) last_retry_at: Option<Instant>,
161}
162
163#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165#[allow(dead_code)] pub(crate) enum CoordinationPhase {
167 Idle,
169 Requesting,
171 Coordinating,
173 Preparing,
175 Punching,
177 Validating,
179 Succeeded,
181 Failed,
183}
184
185#[derive(Debug, Clone)]
187pub(super) struct PunchTarget {
188 pub(super) remote_addr: SocketAddr,
190 pub(super) remote_sequence: VarInt,
192 pub(super) challenge: u64,
194}
195
196#[derive(Debug, Clone, PartialEq, Eq)]
198pub(super) enum TimeoutAction {
199 RetryDiscovery,
201 RetryCoordination,
203 StartValidation,
205 Complete,
207 Failed,
209}
210
211#[derive(Debug, Clone)]
213#[allow(dead_code)] pub(super) struct MultiDestPunchTarget {
215 pub destination: SocketAddr,
217 pub local_addr: SocketAddr,
219 pub pair_type: PairType,
221 pub priority: u32,
223 pub created_at: Instant,
225}
226
227#[derive(Debug, Clone)]
229pub(super) struct CandidatePair {
230 pub(super) remote_sequence: VarInt,
232 pub(super) local_addr: SocketAddr,
234 pub(super) remote_addr: SocketAddr,
236 pub(super) priority: u64,
238 pub(super) state: PairState,
240 pub(super) pair_type: PairType,
242 pub(super) created_at: Instant,
244 #[allow(dead_code)] pub(super) last_check: Option<Instant>,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
251pub(super) enum PairState {
252 Waiting,
254 Succeeded,
256 #[allow(dead_code)] Failed,
259 Frozen,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
265pub(super) enum PairType {
266 HostToHost,
268 HostToServerReflexive,
270 ServerReflexiveToHost,
272 ServerReflexiveToServerReflexive,
274 PeerReflexive,
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
280pub(super) enum CandidateType {
281 Host,
283 ServerReflexive,
285 PeerReflexive,
287}
288
289fn calculate_candidate_priority(
292 candidate_type: CandidateType,
293 local_preference: u16,
294 component_id: u8,
295) -> u32 {
296 let type_preference = match candidate_type {
297 CandidateType::Host => 126,
298 CandidateType::PeerReflexive => 110,
299 CandidateType::ServerReflexive => 100,
300 };
301
302 (1u32 << 24) * type_preference
304 + (1u32 << 8) * local_preference as u32
305 + component_id as u32
306}
307
308fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
311 let g = local_priority as u64;
312 let d = remote_priority as u64;
313
314 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
316}
317
318fn classify_candidate_type(source: CandidateSource) -> CandidateType {
320 match source {
321 CandidateSource::Local => CandidateType::Host,
322 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
323 CandidateSource::Peer => CandidateType::PeerReflexive,
324 CandidateSource::Predicted => CandidateType::ServerReflexive, }
326}
327
328fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
330 match (local_type, remote_type) {
331 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
332 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
333 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
334 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => PairType::ServerReflexiveToServerReflexive,
335 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => PairType::PeerReflexive,
336 }
337}
338
339fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
341 match (local.address, remote.address) {
343 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
344 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
345 _ => false, }
347}
348
349#[derive(Debug, Default, Clone)]
351pub(crate) struct NatTraversalStats {
352 pub(super) remote_candidates_received: u32,
354 pub(super) local_candidates_sent: u32,
356 pub(super) validations_succeeded: u32,
358 #[allow(dead_code)] pub(super) validations_failed: u32,
361 pub(super) coordination_rounds: u32,
363 #[allow(dead_code)] pub(super) successful_coordinations: u32,
366 #[allow(dead_code)] pub(super) failed_coordinations: u32,
369 #[allow(dead_code)] pub(super) timed_out_coordinations: u32,
372 pub(super) coordination_failures: u32,
374 pub(super) direct_connections: u32,
376 pub(super) security_rejections: u32,
378 pub(super) rate_limit_violations: u32,
380 pub(super) invalid_address_rejections: u32,
382 pub(super) suspicious_coordination_attempts: u32,
384}
385
386#[derive(Debug)]
388pub(super) struct SecurityValidationState {
389 candidate_rate_tracker: VecDeque<Instant>,
391 max_candidates_per_window: u32,
393 rate_window: Duration,
395 coordination_requests: VecDeque<CoordinationRequest>,
397 max_coordination_per_window: u32,
399 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
401 #[allow(dead_code)] validation_cache_timeout: Duration,
404}
405
406#[derive(Debug, Clone)]
408struct CoordinationRequest {
409 timestamp: Instant,
411}
412
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
416enum AddressValidationResult {
417 Valid,
419 Invalid,
421 Suspicious,
423}
424
425#[derive(Debug, Clone)]
427pub(super) struct AdaptiveTimeoutState {
428 current_timeout: Duration,
430 min_timeout: Duration,
432 max_timeout: Duration,
434 base_timeout: Duration,
436 backoff_multiplier: f64,
438 max_backoff_multiplier: f64,
440 jitter_factor: f64,
442 srtt: Option<Duration>,
444 rttvar: Option<Duration>,
446 last_rtt: Option<Duration>,
448 consecutive_timeouts: u32,
450 successful_responses: u32,
452}
453
454#[derive(Debug)]
456pub(super) struct NetworkConditionMonitor {
457 rtt_samples: VecDeque<Duration>,
459 max_samples: usize,
461 packet_loss_rate: f64,
463 #[allow(dead_code)] congestion_window: u32,
466 quality_score: f64,
468 last_quality_update: Instant,
470 quality_update_interval: Duration,
472 timeout_stats: TimeoutStatistics,
474}
475
476#[derive(Debug, Default)]
478struct TimeoutStatistics {
479 total_timeouts: u64,
481 total_responses: u64,
483 avg_response_time: Duration,
485 timeout_rate: f64,
487 last_update: Option<Instant>,
489}
490
491impl SecurityValidationState {
492 fn new() -> Self {
494 Self {
495 candidate_rate_tracker: VecDeque::new(),
496 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
498 coordination_requests: VecDeque::new(),
499 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
501 validation_cache_timeout: Duration::from_secs(300), }
503 }
504
505 #[allow(dead_code)] fn new_with_limits(
508 max_candidates_per_window: u32,
509 max_coordination_per_window: u32,
510 rate_window: Duration,
511 ) -> Self {
512 Self {
513 candidate_rate_tracker: VecDeque::new(),
514 max_candidates_per_window,
515 rate_window,
516 coordination_requests: VecDeque::new(),
517 max_coordination_per_window,
518 address_validation_cache: HashMap::new(),
519 validation_cache_timeout: Duration::from_secs(300),
520 }
521 }
522
523 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
528 self.cleanup_rate_tracker(now);
530 self.cleanup_coordination_tracker(now);
531
532 let _current_candidate_rate = self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
534 let _current_coordination_rate = self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
535
536 let peer_reputation = self.calculate_peer_reputation(peer_id);
538 let adaptive_candidate_limit = (self.max_candidates_per_window as f64 * peer_reputation) as u32;
539 let adaptive_coordination_limit = (self.max_coordination_per_window as f64 * peer_reputation) as u32;
540
541 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
543 debug!("Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
544 hex::encode(&peer_id[..8]), self.candidate_rate_tracker.len(), adaptive_candidate_limit);
545 return true;
546 }
547
548 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
549 debug!("Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
550 hex::encode(&peer_id[..8]), self.coordination_requests.len(), adaptive_coordination_limit);
551 return true;
552 }
553
554 false
555 }
556
557 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
562 1.0
572 }
573
574 fn validate_amplification_limits(
579 &mut self,
580 source_addr: SocketAddr,
581 target_addr: SocketAddr,
582 now: Instant,
583 ) -> Result<(), NatTraversalError> {
584 let amplification_key = (source_addr, target_addr);
586
587 if self.is_amplification_suspicious(amplification_key, now) {
596 warn!("Potential amplification attack detected: {} -> {}", source_addr, target_addr);
597 return Err(NatTraversalError::SuspiciousCoordination);
598 }
599
600 Ok(())
601 }
602
603 fn is_amplification_suspicious(&self, _amplification_key: (SocketAddr, SocketAddr), _now: Instant) -> bool {
605 false
615 }
616
617 #[allow(dead_code)] fn generate_secure_coordination_round(&self) -> VarInt {
623 let secure_random: u64 = rand::random();
625
626 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
630 }
631
632 fn enhanced_address_validation(
640 &mut self,
641 addr: SocketAddr,
642 source_addr: SocketAddr,
643 now: Instant,
644 ) -> Result<AddressValidationResult, NatTraversalError> {
645 let basic_result = self.validate_address(addr, now);
647
648 match basic_result {
649 AddressValidationResult::Invalid => {
650 return Err(NatTraversalError::InvalidAddress);
651 }
652 AddressValidationResult::Suspicious => {
653 return Err(NatTraversalError::SuspiciousCoordination);
654 }
655 AddressValidationResult::Valid => {
656 }
658 }
659
660 self.validate_amplification_limits(source_addr, addr, now)?;
662
663 if self.is_address_in_suspicious_range(addr) {
665 warn!("Address in suspicious range detected: {}", addr);
666 return Err(NatTraversalError::SuspiciousCoordination);
667 }
668
669 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
670 warn!("Suspicious coordination pattern detected: {} -> {}", source_addr, addr);
671 return Err(NatTraversalError::SuspiciousCoordination);
672 }
673
674 Ok(AddressValidationResult::Valid)
675 }
676
677 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
679 match addr.ip() {
680 IpAddr::V4(ipv4) => {
681 let octets = ipv4.octets();
683
684 if octets[0] == 0 || octets[0] == 127 {
686 return true;
687 }
688
689 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
691 return true;
692 }
693 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
694 return true;
695 }
696 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
697 return true;
698 }
699
700 false
701 }
702 IpAddr::V6(ipv6) => {
703 if ipv6.is_loopback() || ipv6.is_unspecified() {
705 return true;
706 }
707
708 let segments = ipv6.segments();
710 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
711 return true;
712 }
713
714 false
715 }
716 }
717 }
718
719 fn is_coordination_pattern_suspicious(
721 &self,
722 _source_addr: SocketAddr,
723 _target_addr: SocketAddr,
724 _now: Instant,
725 ) -> bool {
726 false
736 }
737
738 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
740 self.cleanup_rate_tracker(now);
742
743 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
745 return true;
746 }
747
748 self.candidate_rate_tracker.push_back(now);
750 false
751 }
752
753 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
755 self.cleanup_coordination_tracker(now);
757
758 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
760 return true;
761 }
762
763 let request = CoordinationRequest {
765 timestamp: now,
766 };
767 self.coordination_requests.push_back(request);
768 false
769 }
770
771 fn cleanup_rate_tracker(&mut self, now: Instant) {
773 let cutoff = now - self.rate_window;
774 while let Some(&front_time) = self.candidate_rate_tracker.front() {
775 if front_time < cutoff {
776 self.candidate_rate_tracker.pop_front();
777 } else {
778 break;
779 }
780 }
781 }
782
783 fn cleanup_coordination_tracker(&mut self, now: Instant) {
785 let cutoff = now - self.rate_window;
786 while let Some(front_request) = self.coordination_requests.front() {
787 if front_request.timestamp < cutoff {
788 self.coordination_requests.pop_front();
789 } else {
790 break;
791 }
792 }
793 }
794
795 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
797 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
799 return cached_result;
800 }
801
802 let result = self.perform_address_validation(addr);
803
804 self.address_validation_cache.insert(addr, result);
806
807 if self.address_validation_cache.len() > 1000 {
809 self.cleanup_address_cache(now);
810 }
811
812 result
813 }
814
815 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
817 match addr.ip() {
818 IpAddr::V4(ipv4) => {
819 if ipv4.is_unspecified() || ipv4.is_broadcast() {
821 return AddressValidationResult::Invalid;
822 }
823
824 if ipv4.is_multicast() || ipv4.is_documentation() {
826 return AddressValidationResult::Suspicious;
827 }
828
829 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
831 return AddressValidationResult::Invalid;
832 }
833
834 if self.is_suspicious_ipv4(ipv4) {
836 return AddressValidationResult::Suspicious;
837 }
838 }
839 IpAddr::V6(ipv6) => {
840 if ipv6.is_unspecified() || ipv6.is_multicast() {
842 return AddressValidationResult::Invalid;
843 }
844
845 if self.is_suspicious_ipv6(ipv6) {
847 return AddressValidationResult::Suspicious;
848 }
849 }
850 }
851
852 if addr.port() == 0 || addr.port() < 1024 {
854 return AddressValidationResult::Suspicious;
855 }
856
857 AddressValidationResult::Valid
858 }
859
860 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
862 let octets = ipv4.octets();
863
864 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
867 return true;
868 }
869
870 false
873 }
874
875 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
877 let segments = ipv6.segments();
878
879 if segments.iter().all(|&s| s == segments[0]) {
881 return true;
882 }
883
884 false
885 }
886
887 fn cleanup_address_cache(&mut self, _now: Instant) {
889 if self.address_validation_cache.len() > 500 {
892 let keys_to_remove: Vec<_> = self.address_validation_cache
893 .keys()
894 .take(self.address_validation_cache.len() / 2)
895 .copied()
896 .collect();
897
898 for key in keys_to_remove {
899 self.address_validation_cache.remove(&key);
900 }
901 }
902 }
903
904 fn validate_punch_me_now_frame(
912 &mut self,
913 frame: &crate::frame::PunchMeNow,
914 source_addr: SocketAddr,
915 peer_id: [u8; 32],
916 now: Instant
917 ) -> Result<(), NatTraversalError> {
918 if self.is_coordination_rate_limited(now) {
920 debug!("PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}", hex::encode(&peer_id[..8]));
921 return Err(NatTraversalError::RateLimitExceeded);
922 }
923
924 let addr_validation = self.validate_address(frame.local_address, now);
926 match addr_validation {
927 AddressValidationResult::Invalid => {
928 debug!("PUNCH_ME_NOW frame rejected: invalid local_address {:?} from peer {:?}",
929 frame.local_address, hex::encode(&peer_id[..8]));
930 return Err(NatTraversalError::InvalidAddress);
931 }
932 AddressValidationResult::Suspicious => {
933 debug!("PUNCH_ME_NOW frame rejected: suspicious local_address {:?} from peer {:?}",
934 frame.local_address, hex::encode(&peer_id[..8]));
935 return Err(NatTraversalError::SuspiciousCoordination);
936 }
937 AddressValidationResult::Valid => {
938 }
940 }
941
942 if !self.validate_address_consistency(frame.local_address, source_addr) {
945 debug!("PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
946 frame.local_address, source_addr);
947 return Err(NatTraversalError::SuspiciousCoordination);
948 }
949
950 if !self.validate_coordination_parameters(frame) {
952 debug!("PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
953 hex::encode(&peer_id[..8]));
954 return Err(NatTraversalError::SuspiciousCoordination);
955 }
956
957 if let Some(target_peer_id) = frame.target_peer_id {
959 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
960 debug!("PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
961 hex::encode(&peer_id[..8]), hex::encode(&target_peer_id[..8]));
962 return Err(NatTraversalError::SuspiciousCoordination);
963 }
964 }
965
966 if !self.validate_resource_limits(frame) {
968 debug!("PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
969 hex::encode(&peer_id[..8]));
970 return Err(NatTraversalError::ResourceLimitExceeded);
971 }
972
973 debug!("PUNCH_ME_NOW frame validation passed for peer {:?}", hex::encode(&peer_id[..8]));
974 Ok(())
975 }
976
977 fn validate_address_consistency(&self, claimed_addr: SocketAddr, observed_addr: SocketAddr) -> bool {
982 match (claimed_addr.ip(), observed_addr.ip()) {
987 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
988 if claimed_ip == observed_ip {
990 return true;
991 }
992
993 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
995 return true;
996 }
997
998 !claimed_ip.is_private() && !observed_ip.is_private()
1001 }
1002 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
1003 claimed_ip == observed_ip ||
1005 self.are_in_same_prefix_v6(claimed_ip, observed_ip)
1006 }
1007 _ => {
1008 false
1010 }
1011 }
1012 }
1013
1014 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1016 let ip1_octets = ip1.octets();
1018 let ip2_octets = ip2.octets();
1019
1020 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1022 return true;
1023 }
1024
1025 if ip1_octets[0] == 172 && ip2_octets[0] == 172 &&
1027 (16..=31).contains(&ip1_octets[1]) && (16..=31).contains(&ip2_octets[1]) {
1028 return true;
1029 }
1030
1031 if ip1_octets[0] == 192 && ip1_octets[1] == 168 &&
1033 ip2_octets[0] == 192 && ip2_octets[1] == 168 {
1034 return true;
1035 }
1036
1037 false
1038 }
1039
1040 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1042 let segments1 = ip1.segments();
1044 let segments2 = ip2.segments();
1045
1046 segments1[0] == segments2[0] &&
1047 segments1[1] == segments2[1] &&
1048 segments1[2] == segments2[2] &&
1049 segments1[3] == segments2[3]
1050 }
1051
1052 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1054 if frame.round.into_inner() > 1000000 {
1056 return false;
1057 }
1058
1059 if frame.target_sequence.into_inner() > 10000 {
1061 return false;
1062 }
1063
1064 match frame.local_address.ip() {
1066 IpAddr::V4(ipv4) => {
1067 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1069 }
1070 IpAddr::V6(ipv6) => {
1071 !ipv6.is_unspecified() && !ipv6.is_multicast()
1073 }
1074 }
1075 }
1076
1077 fn validate_target_peer_request(
1079 &self,
1080 requesting_peer: [u8; 32],
1081 target_peer: [u8; 32],
1082 _frame: &crate::frame::PunchMeNow
1083 ) -> bool {
1084 if requesting_peer == target_peer {
1086 return false;
1087 }
1088
1089 true
1095 }
1096
1097 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1099 self.coordination_requests.len() < self.max_coordination_per_window as usize
1108 }
1109}
1110
1111impl AdaptiveTimeoutState {
1112 pub(crate) fn new() -> Self {
1114 let base_timeout = Duration::from_millis(1000); Self {
1116 current_timeout: base_timeout,
1117 min_timeout: Duration::from_millis(100),
1118 max_timeout: Duration::from_secs(30),
1119 base_timeout,
1120 backoff_multiplier: 1.0,
1121 max_backoff_multiplier: 8.0,
1122 jitter_factor: 0.1, srtt: None,
1124 rttvar: None,
1125 last_rtt: None,
1126 consecutive_timeouts: 0,
1127 successful_responses: 0,
1128 }
1129 }
1130
1131 fn update_success(&mut self, rtt: Duration) {
1133 self.last_rtt = Some(rtt);
1134 self.successful_responses += 1;
1135 self.consecutive_timeouts = 0;
1136
1137 match self.srtt {
1139 None => {
1140 self.srtt = Some(rtt);
1141 self.rttvar = Some(rtt / 2);
1142 }
1143 Some(srtt) => {
1144 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1145 let abs_diff = if rtt > srtt { rtt - srtt } else { srtt - rtt };
1146
1147 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1148 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1149 }
1150 }
1151
1152 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1154
1155 self.calculate_current_timeout();
1157 }
1158
1159 fn update_timeout(&mut self) {
1161 self.consecutive_timeouts += 1;
1162
1163 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1165
1166 self.calculate_current_timeout();
1168 }
1169
1170 fn calculate_current_timeout(&mut self) {
1172 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1173 srtt + rttvar * 4
1175 } else {
1176 self.base_timeout
1177 };
1178
1179 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1181
1182 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1184 let timeout = timeout.mul_f64(jitter);
1185
1186 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1188 }
1189
1190 fn get_timeout(&self) -> Duration {
1192 self.current_timeout
1193 }
1194
1195 fn should_retry(&self, max_retries: u32) -> bool {
1197 self.consecutive_timeouts < max_retries
1198 }
1199
1200 fn get_retry_delay(&self) -> Duration {
1202 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1203 delay.clamp(self.min_timeout, self.max_timeout)
1204 }
1205}
1206
1207#[derive(Debug)]
1209pub(super) struct ResourceManagementConfig {
1210 max_active_validations: usize,
1212 max_local_candidates: usize,
1214 max_remote_candidates: usize,
1216 max_candidate_pairs: usize,
1218 #[allow(dead_code)] max_coordination_history: usize,
1221 cleanup_interval: Duration,
1223 candidate_timeout: Duration,
1225 validation_timeout: Duration,
1227 coordination_timeout: Duration,
1229 memory_pressure_threshold: f64,
1231 aggressive_cleanup_threshold: f64,
1233}
1234
1235#[derive(Debug, Default)]
1237pub(super) struct ResourceStats {
1238 active_validations: usize,
1240 local_candidates: usize,
1242 remote_candidates: usize,
1244 candidate_pairs: usize,
1246 peak_memory_usage: usize,
1248 cleanup_operations: u64,
1250 resources_cleaned: u64,
1252 allocation_failures: u64,
1254 #[allow(dead_code)] last_cleanup: Option<Instant>,
1257 memory_pressure: f64,
1259}
1260
1261#[derive(Debug)]
1263pub(super) struct ResourceCleanupCoordinator {
1264 config: ResourceManagementConfig,
1266 stats: ResourceStats,
1268 last_cleanup: Option<Instant>,
1270 cleanup_counter: u64,
1272 shutdown_requested: bool,
1274}
1275
1276impl ResourceManagementConfig {
1277 fn new() -> Self {
1279 Self {
1280 max_active_validations: 100,
1281 max_local_candidates: 50,
1282 max_remote_candidates: 100,
1283 max_candidate_pairs: 200,
1284 max_coordination_history: 10,
1285 cleanup_interval: Duration::from_secs(30),
1286 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1288 coordination_timeout: Duration::from_secs(60),
1289 memory_pressure_threshold: 0.75,
1290 aggressive_cleanup_threshold: 0.90,
1291 }
1292 }
1293
1294 #[allow(dead_code)] fn low_memory() -> Self {
1297 Self {
1298 max_active_validations: 25,
1299 max_local_candidates: 10,
1300 max_remote_candidates: 25,
1301 max_candidate_pairs: 50,
1302 max_coordination_history: 3,
1303 cleanup_interval: Duration::from_secs(15),
1304 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1306 coordination_timeout: Duration::from_secs(30),
1307 memory_pressure_threshold: 0.60,
1308 aggressive_cleanup_threshold: 0.80,
1309 }
1310 }
1311}
1312
1313impl ResourceCleanupCoordinator {
1314 fn new() -> Self {
1316 Self {
1317 config: ResourceManagementConfig::new(),
1318 stats: ResourceStats::default(),
1319 last_cleanup: None,
1320 cleanup_counter: 0,
1321 shutdown_requested: false,
1322 }
1323 }
1324
1325 #[allow(dead_code)] fn low_memory() -> Self {
1328 Self {
1329 config: ResourceManagementConfig::low_memory(),
1330 stats: ResourceStats::default(),
1331 last_cleanup: None,
1332 cleanup_counter: 0,
1333 shutdown_requested: false,
1334 }
1335 }
1336
1337 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1339 state.active_validations.len() > self.config.max_active_validations ||
1340 state.local_candidates.len() > self.config.max_local_candidates ||
1341 state.remote_candidates.len() > self.config.max_remote_candidates ||
1342 state.candidate_pairs.len() > self.config.max_candidate_pairs
1343 }
1344
1345 fn calculate_memory_pressure(&mut self, active_validations_len: usize, local_candidates_len: usize,
1347 remote_candidates_len: usize, candidate_pairs_len: usize) -> f64 {
1348 let total_limit = self.config.max_active_validations +
1349 self.config.max_local_candidates +
1350 self.config.max_remote_candidates +
1351 self.config.max_candidate_pairs;
1352
1353 let current_usage = active_validations_len +
1354 local_candidates_len +
1355 remote_candidates_len +
1356 candidate_pairs_len;
1357
1358 let pressure = current_usage as f64 / total_limit as f64;
1359 self.stats.memory_pressure = pressure;
1360 pressure
1361 }
1362
1363 fn should_cleanup(&self, now: Instant) -> bool {
1365 if self.shutdown_requested {
1366 return true;
1367 }
1368
1369 if let Some(last_cleanup) = self.last_cleanup {
1371 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1372 return true;
1373 }
1374 } else {
1375 return true; }
1377
1378 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1380 return true;
1381 }
1382
1383 false
1384 }
1385
1386 #[allow(dead_code)] fn cleanup_expired_resources(&mut self,
1389 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1390 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1391 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1392 candidate_pairs: &mut Vec<CandidatePair>,
1393 coordination: &mut Option<CoordinationState>,
1394 now: Instant) -> u64 {
1395 let mut cleaned = 0;
1396
1397 cleaned += self.cleanup_expired_validations(active_validations, now);
1399
1400 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1402
1403 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1405
1406 cleaned += self.cleanup_old_coordination(coordination, now);
1408
1409 self.stats.cleanup_operations += 1;
1411 self.stats.resources_cleaned += cleaned;
1412 self.last_cleanup = Some(now);
1413 self.cleanup_counter += 1;
1414
1415 debug!("Cleaned up {} expired resources", cleaned);
1416 cleaned
1417 }
1418
1419 #[allow(dead_code)] fn cleanup_expired_validations(&mut self, active_validations: &mut HashMap<SocketAddr, PathValidationState>, now: Instant) -> u64 {
1422 let mut cleaned = 0;
1423 let validation_timeout = self.config.validation_timeout;
1424
1425 active_validations.retain(|_addr, validation| {
1426 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1427 if is_expired {
1428 cleaned += 1;
1429 trace!("Cleaned up expired validation for {:?}", _addr);
1430 }
1431 !is_expired
1432 });
1433
1434 cleaned
1435 }
1436
1437 #[allow(dead_code)] fn cleanup_stale_candidates(&mut self, local_candidates: &mut HashMap<VarInt, AddressCandidate>, remote_candidates: &mut HashMap<VarInt, AddressCandidate>, now: Instant) -> u64 {
1440 let mut cleaned = 0;
1441 let candidate_timeout = self.config.candidate_timeout;
1442
1443 local_candidates.retain(|_seq, candidate| {
1445 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout ||
1446 candidate.state == CandidateState::Failed ||
1447 candidate.state == CandidateState::Removed;
1448 if is_stale {
1449 cleaned += 1;
1450 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1451 }
1452 !is_stale
1453 });
1454
1455 remote_candidates.retain(|_seq, candidate| {
1457 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout ||
1458 candidate.state == CandidateState::Failed ||
1459 candidate.state == CandidateState::Removed;
1460 if is_stale {
1461 cleaned += 1;
1462 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1463 }
1464 !is_stale
1465 });
1466
1467 cleaned
1468 }
1469
1470 #[allow(dead_code)] fn cleanup_failed_pairs(&mut self, candidate_pairs: &mut Vec<CandidatePair>, now: Instant) -> u64 {
1473 let mut cleaned = 0;
1474 let pair_timeout = self.config.candidate_timeout;
1475
1476 candidate_pairs.retain(|pair| {
1477 let is_stale = now.duration_since(pair.created_at) > pair_timeout ||
1478 pair.state == PairState::Failed;
1479 if is_stale {
1480 cleaned += 1;
1481 trace!("Cleaned up failed candidate pair {:?} -> {:?}", pair.local_addr, pair.remote_addr);
1482 }
1483 !is_stale
1484 });
1485
1486 cleaned
1487 }
1488
1489 #[allow(dead_code)] fn cleanup_old_coordination(&mut self, coordination: &mut Option<CoordinationState>, now: Instant) -> u64 {
1492 let mut cleaned = 0;
1493
1494 if let Some(coord) = coordination {
1495 let is_expired = now.duration_since(coord.round_start) > self.config.coordination_timeout;
1496 let is_failed = coord.state == CoordinationPhase::Failed;
1497
1498 if is_expired || is_failed {
1499 let round = coord.round;
1500 *coordination = None;
1501 cleaned += 1;
1502 trace!("Cleaned up old coordination state for round {}", round);
1503 }
1504 }
1505
1506 cleaned
1507 }
1508
1509 #[allow(dead_code)] fn aggressive_cleanup(&mut self,
1512 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1513 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1514 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1515 candidate_pairs: &mut Vec<CandidatePair>,
1516 now: Instant) -> u64 {
1517 let mut cleaned = 0;
1518
1519 let aggressive_timeout = self.config.candidate_timeout / 2;
1521
1522 local_candidates.retain(|_seq, candidate| {
1524 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout &&
1525 candidate.state != CandidateState::Failed;
1526 if !keep {
1527 cleaned += 1;
1528 }
1529 keep
1530 });
1531
1532 remote_candidates.retain(|_seq, candidate| {
1533 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout &&
1534 candidate.state != CandidateState::Failed;
1535 if !keep {
1536 cleaned += 1;
1537 }
1538 keep
1539 });
1540
1541 candidate_pairs.retain(|pair| {
1543 let keep = pair.state != PairState::Waiting ||
1544 now.duration_since(pair.created_at) <= aggressive_timeout;
1545 if !keep {
1546 cleaned += 1;
1547 }
1548 keep
1549 });
1550
1551 active_validations.retain(|_addr, validation| {
1553 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1554 if !keep {
1555 cleaned += 1;
1556 }
1557 keep
1558 });
1559
1560 warn!("Aggressive cleanup removed {} resources due to memory pressure", cleaned);
1561 cleaned
1562 }
1563
1564 #[allow(dead_code)] fn request_shutdown(&mut self) {
1567 self.shutdown_requested = true;
1568 debug!("Resource cleanup coordinator shutdown requested");
1569 }
1570
1571 #[allow(dead_code)] fn shutdown_cleanup(&mut self,
1574 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1575 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1576 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1577 candidate_pairs: &mut Vec<CandidatePair>,
1578 coordination: &mut Option<CoordinationState>) -> u64 {
1579 let mut cleaned = 0;
1580
1581 cleaned += active_validations.len() as u64;
1583 active_validations.clear();
1584
1585 cleaned += local_candidates.len() as u64;
1586 local_candidates.clear();
1587
1588 cleaned += remote_candidates.len() as u64;
1589 remote_candidates.clear();
1590
1591 cleaned += candidate_pairs.len() as u64;
1592 candidate_pairs.clear();
1593
1594 if coordination.is_some() {
1595 *coordination = None;
1596 cleaned += 1;
1597 }
1598
1599 info!("Shutdown cleanup removed {} resources", cleaned);
1600 cleaned
1601 }
1602
1603 #[allow(dead_code)] fn get_resource_stats(&self) -> &ResourceStats {
1606 &self.stats
1607 }
1608
1609 fn update_stats(&mut self, active_validations_len: usize, local_candidates_len: usize,
1611 remote_candidates_len: usize, candidate_pairs_len: usize) {
1612 self.stats.active_validations = active_validations_len;
1613 self.stats.local_candidates = local_candidates_len;
1614 self.stats.remote_candidates = remote_candidates_len;
1615 self.stats.candidate_pairs = candidate_pairs_len;
1616
1617 let current_usage = self.stats.active_validations +
1619 self.stats.local_candidates +
1620 self.stats.remote_candidates +
1621 self.stats.candidate_pairs;
1622
1623 if current_usage > self.stats.peak_memory_usage {
1624 self.stats.peak_memory_usage = current_usage;
1625 }
1626 }
1627
1628 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1630 self.last_cleanup = Some(now);
1631 self.cleanup_counter += 1;
1632
1633 self.stats.cleanup_operations += 1;
1635
1636 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1637 }
1638}
1639
1640impl NetworkConditionMonitor {
1641 fn new() -> Self {
1643 Self {
1644 rtt_samples: VecDeque::new(),
1645 max_samples: 20,
1646 packet_loss_rate: 0.0,
1647 congestion_window: 10,
1648 quality_score: 0.8, last_quality_update: Instant::now(),
1650 quality_update_interval: Duration::from_secs(10),
1651 timeout_stats: TimeoutStatistics::default(),
1652 }
1653 }
1654
1655 fn record_success(&mut self, rtt: Duration, now: Instant) {
1657 self.rtt_samples.push_back(rtt);
1659 if self.rtt_samples.len() > self.max_samples {
1660 self.rtt_samples.pop_front();
1661 }
1662
1663 self.timeout_stats.total_responses += 1;
1665 self.update_timeout_stats(now);
1666
1667 self.update_quality_score(now);
1669 }
1670
1671 fn record_timeout(&mut self, now: Instant) {
1673 self.timeout_stats.total_timeouts += 1;
1674 self.update_timeout_stats(now);
1675
1676 self.update_quality_score(now);
1678 }
1679
1680 fn update_timeout_stats(&mut self, now: Instant) {
1682 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1683
1684 if total_attempts > 0 {
1685 self.timeout_stats.timeout_rate = self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1686 }
1687
1688 if !self.rtt_samples.is_empty() {
1690 let total_rtt: Duration = self.rtt_samples.iter().sum();
1691 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1692 }
1693
1694 self.timeout_stats.last_update = Some(now);
1695 }
1696
1697 fn update_quality_score(&mut self, now: Instant) {
1699 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1700 return;
1701 }
1702
1703 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1705 let rtt_factor = self.calculate_rtt_factor();
1706 let consistency_factor = self.calculate_consistency_factor();
1707
1708 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1710
1711 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1713 self.last_quality_update = now;
1714 }
1715
1716 fn calculate_rtt_factor(&self) -> f64 {
1718 if self.rtt_samples.is_empty() {
1719 return 0.5; }
1721
1722 let avg_rtt = self.timeout_stats.avg_response_time;
1723
1724 let rtt_ms = avg_rtt.as_millis() as f64;
1726 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1727 factor.clamp(0.0, 1.0)
1728 }
1729
1730 fn calculate_consistency_factor(&self) -> f64 {
1732 if self.rtt_samples.len() < 3 {
1733 return 0.5; }
1735
1736 let mean_rtt = self.timeout_stats.avg_response_time;
1738 let variance: f64 = self.rtt_samples
1739 .iter()
1740 .map(|rtt| {
1741 let diff = if *rtt > mean_rtt { *rtt - mean_rtt } else { mean_rtt - *rtt };
1742 diff.as_millis() as f64
1743 })
1744 .map(|diff| diff * diff)
1745 .sum::<f64>() / self.rtt_samples.len() as f64;
1746
1747 let std_dev = variance.sqrt();
1748
1749 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1751 consistency.clamp(0.0, 1.0)
1752 }
1753
1754 fn get_quality_score(&self) -> f64 {
1756 self.quality_score
1757 }
1758
1759 fn get_estimated_rtt(&self) -> Option<Duration> {
1761 if self.rtt_samples.is_empty() {
1762 return None;
1763 }
1764
1765 Some(self.timeout_stats.avg_response_time)
1766 }
1767
1768 fn is_suitable_for_coordination(&self) -> bool {
1770 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1772 }
1773
1774 #[allow(dead_code)] fn get_packet_loss_rate(&self) -> f64 {
1777 self.packet_loss_rate
1778 }
1779
1780 #[allow(dead_code)] fn get_timeout_multiplier(&self) -> f64 {
1783 let base_multiplier = 1.0;
1784
1785 let quality_multiplier = if self.quality_score < 0.3 {
1787 2.0 } else if self.quality_score > 0.8 {
1789 0.8 } else {
1791 1.0 };
1793
1794 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1796
1797 base_multiplier * quality_multiplier * loss_multiplier
1798 }
1799
1800 #[allow(dead_code)] fn cleanup(&mut self, now: Instant) {
1803 let _cutoff_time = now - Duration::from_secs(60);
1805
1806 if let Some(last_update) = self.timeout_stats.last_update {
1808 if now.duration_since(last_update) > Duration::from_secs(300) {
1809 self.timeout_stats = TimeoutStatistics::default();
1810 }
1811 }
1812 }
1813}
1814
1815impl NatTraversalState {
1816 pub(super) fn new(
1818 role: NatTraversalRole,
1819 max_candidates: u32,
1820 coordination_timeout: Duration,
1821 ) -> Self {
1822 let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1823 Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1824 } else {
1825 None
1826 };
1827
1828 Self {
1829 role,
1830 local_candidates: HashMap::new(),
1831 remote_candidates: HashMap::new(),
1832 candidate_pairs: Vec::new(),
1833 pair_index: HashMap::new(),
1834 active_validations: HashMap::new(),
1835 coordination: None,
1836 next_sequence: VarInt::from_u32(1),
1837 max_candidates,
1838 coordination_timeout,
1839 stats: NatTraversalStats::default(),
1840 security_state: SecurityValidationState::new(),
1841 network_monitor: NetworkConditionMonitor::new(),
1842 resource_manager: ResourceCleanupCoordinator::new(),
1843 bootstrap_coordinator,
1844 multi_dest_transmitter: MultiDestinationTransmitter::new(),
1845 }
1846 }
1847
1848 pub(super) fn add_remote_candidate(
1850 &mut self,
1851 sequence: VarInt,
1852 address: SocketAddr,
1853 priority: VarInt,
1854 now: Instant,
1855 ) -> Result<(), NatTraversalError> {
1856 if self.should_reject_new_resources(now) {
1858 debug!("Rejecting new candidate due to resource limits: {}", address);
1859 return Err(NatTraversalError::ResourceLimitExceeded);
1860 }
1861
1862 if self.security_state.is_candidate_rate_limited(now) {
1864 self.stats.rate_limit_violations += 1;
1865 debug!("Rate limit exceeded for candidate addition: {}", address);
1866 return Err(NatTraversalError::RateLimitExceeded);
1867 }
1868
1869 match self.security_state.validate_address(address, now) {
1871 AddressValidationResult::Invalid => {
1872 self.stats.invalid_address_rejections += 1;
1873 self.stats.security_rejections += 1;
1874 debug!("Invalid address rejected: {}", address);
1875 return Err(NatTraversalError::InvalidAddress);
1876 }
1877 AddressValidationResult::Suspicious => {
1878 self.stats.security_rejections += 1;
1879 debug!("Suspicious address rejected: {}", address);
1880 return Err(NatTraversalError::SecurityValidationFailed);
1881 }
1882 AddressValidationResult::Valid => {
1883 }
1885 }
1886
1887 if self.remote_candidates.len() >= self.max_candidates as usize {
1889 return Err(NatTraversalError::TooManyCandidates);
1890 }
1891
1892 if self.remote_candidates.values()
1894 .any(|c| c.address == address && c.state != CandidateState::Removed)
1895 {
1896 return Err(NatTraversalError::DuplicateAddress);
1897 }
1898
1899 let candidate = AddressCandidate {
1900 address,
1901 priority: priority.into_inner() as u32,
1902 source: CandidateSource::Peer,
1903 discovered_at: now,
1904 state: CandidateState::New,
1905 attempt_count: 0,
1906 last_attempt: None,
1907 };
1908
1909 self.remote_candidates.insert(sequence, candidate);
1910 self.stats.remote_candidates_received += 1;
1911
1912 trace!("Added remote candidate: {} with priority {}", address, priority);
1913 Ok(())
1914 }
1915
1916 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1918 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1919 candidate.state = CandidateState::Removed;
1920
1921 self.active_validations.remove(&candidate.address);
1923 true
1924 } else {
1925 false
1926 }
1927 }
1928
1929 #[allow(dead_code)] pub(super) fn add_local_candidate(
1932 &mut self,
1933 address: SocketAddr,
1934 source: CandidateSource,
1935 now: Instant,
1936 ) -> VarInt {
1937 let sequence = self.next_sequence;
1938 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1939 .expect("sequence number overflow");
1940
1941 let candidate_type = classify_candidate_type(source);
1943 let local_preference = self.calculate_local_preference(address);
1944 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1945
1946 let candidate = AddressCandidate {
1947 address,
1948 priority,
1949 source,
1950 discovered_at: now,
1951 state: CandidateState::New,
1952 attempt_count: 0,
1953 last_attempt: None,
1954 };
1955
1956 self.local_candidates.insert(sequence, candidate);
1957 self.stats.local_candidates_sent += 1;
1958
1959 self.generate_candidate_pairs(now);
1961
1962 sequence
1963 }
1964
1965 #[allow(dead_code)] fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1968 match addr {
1969 SocketAddr::V4(v4) => {
1970 if v4.ip().is_loopback() {
1971 0 } else if v4.ip().is_private() {
1973 65000 } else {
1975 32000 }
1977 }
1978 SocketAddr::V6(v6) => {
1979 if v6.ip().is_loopback() {
1980 0
1981 } else if v6.ip().is_unicast_link_local() {
1982 30000 } else {
1984 50000 }
1986 }
1987 }
1988 }
1989
1990 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1992 self.candidate_pairs.clear();
1993 self.pair_index.clear();
1994
1995 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1997 self.candidate_pairs.reserve(estimated_capacity);
1998 self.pair_index.reserve(estimated_capacity);
1999
2000 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
2002
2003 for (_local_seq, local_candidate) in &self.local_candidates {
2004 if local_candidate.state == CandidateState::Removed {
2006 continue;
2007 }
2008
2009 let local_type = classify_candidate_type(local_candidate.source);
2011
2012 for (remote_seq, remote_candidate) in &self.remote_candidates {
2013 if remote_candidate.state == CandidateState::Removed {
2015 continue;
2016 }
2017
2018 let cache_key = (local_candidate.address, remote_candidate.address);
2020 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2021 are_candidates_compatible(local_candidate, remote_candidate)
2022 });
2023
2024 if !compatible {
2025 continue;
2026 }
2027
2028 let pair_priority = calculate_pair_priority(
2030 local_candidate.priority,
2031 remote_candidate.priority
2032 );
2033
2034 let remote_type = classify_candidate_type(remote_candidate.source);
2036 let pair_type = classify_pair_type(local_type, remote_type);
2037
2038 let pair = CandidatePair {
2039 remote_sequence: *remote_seq,
2040 local_addr: local_candidate.address,
2041 remote_addr: remote_candidate.address,
2042 priority: pair_priority,
2043 state: PairState::Waiting,
2044 pair_type,
2045 created_at: now,
2046 last_check: None,
2047 };
2048
2049 let index = self.candidate_pairs.len();
2051 self.pair_index.insert(remote_candidate.address, index);
2052 self.candidate_pairs.push(pair);
2053 }
2054 }
2055
2056 self.candidate_pairs.sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2058
2059 self.pair_index.clear();
2061 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2062 self.pair_index.insert(pair.remote_addr, idx);
2063 }
2064
2065 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2066 }
2067
2068 pub(super) fn get_next_validation_pairs(&mut self, max_concurrent: usize) -> Vec<&mut CandidatePair> {
2070 let mut result = Vec::with_capacity(max_concurrent);
2073
2074 for pair in self.candidate_pairs.iter_mut() {
2075 if pair.state == PairState::Waiting {
2076 result.push(pair);
2077 if result.len() >= max_concurrent {
2078 break;
2079 }
2080 }
2081 }
2082
2083 result
2084 }
2085
2086 pub(super) fn find_pair_by_remote_addr(&mut self, addr: SocketAddr) -> Option<&mut CandidatePair> {
2088 if let Some(&index) = self.pair_index.get(&addr) {
2090 self.candidate_pairs.get_mut(index)
2091 } else {
2092 None
2093 }
2094 }
2095
2096 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2098 let (succeeded_type, succeeded_priority) = {
2100 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2101 pair.state = PairState::Succeeded;
2102 (pair.pair_type, pair.priority)
2103 } else {
2104 return false;
2105 }
2106 };
2107
2108 for other_pair in &mut self.candidate_pairs {
2110 if other_pair.pair_type == succeeded_type
2111 && other_pair.priority < succeeded_priority
2112 && other_pair.state == PairState::Waiting {
2113 other_pair.state = PairState::Frozen;
2114 }
2115 }
2116
2117 true
2118 }
2119
2120
2121 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2123 let mut best_ipv4: Option<&CandidatePair> = None;
2124 let mut best_ipv6: Option<&CandidatePair> = None;
2125
2126 for pair in &self.candidate_pairs {
2127 if pair.state != PairState::Succeeded {
2128 continue;
2129 }
2130
2131 match pair.remote_addr {
2132 SocketAddr::V4(_) => {
2133 if best_ipv4.map_or(true, |best| pair.priority > best.priority) {
2134 best_ipv4 = Some(pair);
2135 }
2136 }
2137 SocketAddr::V6(_) => {
2138 if best_ipv6.map_or(true, |best| pair.priority > best.priority) {
2139 best_ipv6 = Some(pair);
2140 }
2141 }
2142 }
2143 }
2144
2145 let mut result = Vec::new();
2146 if let Some(pair) = best_ipv4 {
2147 result.push(pair);
2148 }
2149 if let Some(pair) = best_ipv6 {
2150 result.push(pair);
2151 }
2152 result
2153 }
2154
2155 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2157 let mut candidates: Vec<_> = self.remote_candidates
2158 .iter()
2159 .filter(|(_, c)| c.state == CandidateState::New)
2160 .map(|(k, v)| (*k, v))
2161 .collect();
2162
2163 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2165 candidates
2166 }
2167
2168 pub(super) fn start_validation(
2170 &mut self,
2171 sequence: VarInt,
2172 challenge: u64,
2173 now: Instant,
2174 ) -> Result<(), NatTraversalError> {
2175 let candidate = self.remote_candidates.get_mut(&sequence)
2176 .ok_or(NatTraversalError::UnknownCandidate)?;
2177
2178 if candidate.state != CandidateState::New {
2179 return Err(NatTraversalError::InvalidCandidateState);
2180 }
2181
2182 if Self::is_validation_suspicious(candidate, now) {
2184 self.stats.security_rejections += 1;
2185 debug!("Suspicious validation attempt rejected for address {}", candidate.address);
2186 return Err(NatTraversalError::SecurityValidationFailed);
2187 }
2188
2189 if self.active_validations.len() >= 10 {
2191 debug!("Too many concurrent validations, rejecting new validation for {}", candidate.address);
2192 return Err(NatTraversalError::SecurityValidationFailed);
2193 }
2194
2195 candidate.state = CandidateState::Validating;
2197 candidate.attempt_count += 1;
2198 candidate.last_attempt = Some(now);
2199
2200 let validation = PathValidationState {
2202 challenge,
2203 sent_at: now,
2204 retry_count: 0,
2205 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2207 timeout_state: AdaptiveTimeoutState::new(),
2208 last_retry_at: None,
2209 };
2210
2211 self.active_validations.insert(candidate.address, validation);
2212 trace!("Started validation for candidate {} with challenge {}", candidate.address, challenge);
2213 Ok(())
2214 }
2215
2216 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2218 if candidate.attempt_count > 10 {
2220 return true;
2221 }
2222
2223 if let Some(last_attempt) = candidate.last_attempt {
2225 let time_since_last = now.duration_since(last_attempt);
2226 if time_since_last < Duration::from_millis(100) {
2227 return true; }
2229 }
2230
2231 if candidate.state == CandidateState::Failed {
2233 let time_since_discovery = now.duration_since(candidate.discovered_at);
2234 if time_since_discovery < Duration::from_secs(60) {
2235 return true; }
2237 }
2238
2239 false
2240 }
2241
2242 pub(super) fn handle_validation_success(
2244 &mut self,
2245 remote_addr: SocketAddr,
2246 challenge: u64,
2247 now: Instant,
2248 ) -> Result<VarInt, NatTraversalError> {
2249 let sequence = self.remote_candidates
2251 .iter()
2252 .find(|(_, c)| c.address == remote_addr)
2253 .map(|(seq, _)| *seq)
2254 .ok_or(NatTraversalError::UnknownCandidate)?;
2255
2256 let validation = self.active_validations.get_mut(&remote_addr)
2258 .ok_or(NatTraversalError::NoActiveValidation)?;
2259
2260 if validation.challenge != challenge {
2261 return Err(NatTraversalError::ChallengeMismatch);
2262 }
2263
2264 let rtt = now.duration_since(validation.sent_at);
2266 validation.timeout_state.update_success(rtt);
2267
2268 self.network_monitor.record_success(rtt, now);
2270
2271 let candidate = self.remote_candidates.get_mut(&sequence)
2273 .ok_or(NatTraversalError::UnknownCandidate)?;
2274
2275 candidate.state = CandidateState::Valid;
2276 self.active_validations.remove(&remote_addr);
2277 self.stats.validations_succeeded += 1;
2278
2279 trace!("Validation successful for {} with RTT {:?}", remote_addr, rtt);
2280 Ok(sequence)
2281 }
2282
2283
2284 pub(super) fn start_coordination_round(
2286 &mut self,
2287 targets: Vec<PunchTarget>,
2288 now: Instant,
2289 ) -> Result<VarInt, NatTraversalError> {
2290 if self.security_state.is_coordination_rate_limited(now) {
2292 self.stats.rate_limit_violations += 1;
2293 debug!("Rate limit exceeded for coordination request with {} targets", targets.len());
2294 return Err(NatTraversalError::RateLimitExceeded);
2295 }
2296
2297 if self.is_coordination_suspicious(&targets, now) {
2299 self.stats.suspicious_coordination_attempts += 1;
2300 self.stats.security_rejections += 1;
2301 debug!("Suspicious coordination request rejected with {} targets", targets.len());
2302 return Err(NatTraversalError::SuspiciousCoordination);
2303 }
2304
2305 for target in &targets {
2307 match self.security_state.validate_address(target.remote_addr, now) {
2308 AddressValidationResult::Invalid => {
2309 self.stats.invalid_address_rejections += 1;
2310 self.stats.security_rejections += 1;
2311 debug!("Invalid target address in coordination: {}", target.remote_addr);
2312 return Err(NatTraversalError::InvalidAddress);
2313 }
2314 AddressValidationResult::Suspicious => {
2315 self.stats.security_rejections += 1;
2316 debug!("Suspicious target address in coordination: {}", target.remote_addr);
2317 return Err(NatTraversalError::SecurityValidationFailed);
2318 }
2319 AddressValidationResult::Valid => {
2320 }
2322 }
2323 }
2324
2325 let round = self.next_sequence;
2326 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2327 .expect("sequence number overflow");
2328
2329 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2332
2333 self.coordination = Some(CoordinationState {
2334 round,
2335 punch_targets: targets,
2336 round_start: now,
2337 punch_start,
2338 round_duration: self.coordination_timeout,
2339 state: CoordinationPhase::Requesting,
2340 punch_request_sent: false,
2341 peer_punch_received: false,
2342 retry_count: 0,
2343 max_retries: 3,
2344 timeout_state: AdaptiveTimeoutState::new(),
2345 last_retry_at: None,
2346 });
2347
2348 self.stats.coordination_rounds += 1;
2349 trace!("Started coordination round {} with {} targets", round, self.coordination.as_ref().unwrap().punch_targets.len());
2350 Ok(round)
2351 }
2352
2353 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2355 if targets.len() > 20 {
2357 return true;
2358 }
2359
2360 let mut seen_addresses = std::collections::HashSet::new();
2362 for target in targets {
2363 if !seen_addresses.insert(target.remote_addr) {
2364 return true; }
2366 }
2367
2368 if targets.len() > 5 {
2370 let mut ipv4_addresses: Vec<_> = targets
2372 .iter()
2373 .filter_map(|t| match t.remote_addr.ip() {
2374 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2375 _ => None,
2376 })
2377 .collect();
2378
2379 if ipv4_addresses.len() >= 3 {
2380 ipv4_addresses.sort();
2381 let mut sequential_count = 1;
2382 for i in 1..ipv4_addresses.len() {
2383 if ipv4_addresses[i] == ipv4_addresses[i-1] + 1 {
2384 sequential_count += 1;
2385 if sequential_count >= 3 {
2386 return true; }
2388 } else {
2389 sequential_count = 1;
2390 }
2391 }
2392 }
2393 }
2394
2395 false
2396 }
2397
2398 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2400 self.coordination.as_ref().map(|c| c.state)
2401 }
2402
2403 pub(super) fn should_send_punch_request(&self) -> bool {
2405 if let Some(coord) = &self.coordination {
2406 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2407 } else {
2408 false
2409 }
2410 }
2411
2412 pub(super) fn mark_punch_request_sent(&mut self) {
2414 if let Some(coord) = &mut self.coordination {
2415 coord.punch_request_sent = true;
2416 coord.state = CoordinationPhase::Coordinating;
2417 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2418 }
2419 }
2420
2421 pub(super) fn handle_peer_punch_request(&mut self, peer_round: VarInt, now: Instant) -> Result<bool, NatTraversalError> {
2423 if self.is_peer_coordination_suspicious(peer_round, now) {
2425 self.stats.suspicious_coordination_attempts += 1;
2426 self.stats.security_rejections += 1;
2427 debug!("Suspicious peer coordination request rejected for round {}", peer_round);
2428 return Err(NatTraversalError::SuspiciousCoordination);
2429 }
2430
2431 if let Some(coord) = &mut self.coordination {
2432 if coord.round == peer_round {
2433 match coord.state {
2434 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2435 coord.peer_punch_received = true;
2436 coord.state = CoordinationPhase::Preparing;
2437
2438 let network_rtt = self.network_monitor.get_estimated_rtt()
2440 .unwrap_or(Duration::from_millis(100));
2441 let quality_score = self.network_monitor.get_quality_score();
2442
2443 let base_grace = Duration::from_millis(150);
2445 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2446 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2447
2448 let adaptive_grace = Duration::from_millis(
2449 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64
2450 );
2451
2452 coord.punch_start = now + adaptive_grace;
2453
2454 trace!("Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2455 adaptive_grace, network_rtt, quality_score);
2456 Ok(true)
2457 }
2458 CoordinationPhase::Preparing => {
2459 trace!("Peer coordination confirmed during preparation");
2461 Ok(true)
2462 }
2463 _ => {
2464 debug!("Received coordination in unexpected phase: {:?}", coord.state);
2465 Ok(false)
2466 }
2467 }
2468 } else {
2469 debug!("Received coordination for wrong round: {} vs {}", peer_round, coord.round);
2470 Ok(false)
2471 }
2472 } else {
2473 debug!("Received peer coordination but no active round");
2474 Ok(false)
2475 }
2476 }
2477
2478 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2480 if peer_round.into_inner() == 0 {
2482 return true; }
2484
2485 if let Some(coord) = &self.coordination {
2487 let our_round = coord.round.into_inner();
2488 let peer_round_num = peer_round.into_inner();
2489
2490 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2492 return true;
2493 }
2494 }
2495
2496 false
2497 }
2498
2499 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2501 if let Some(coord) = &self.coordination {
2502 match coord.state {
2503 CoordinationPhase::Preparing => now >= coord.punch_start,
2504 CoordinationPhase::Coordinating => {
2505 coord.peer_punch_received && now >= coord.punch_start
2507 }
2508 _ => false,
2509 }
2510 } else {
2511 false
2512 }
2513 }
2514
2515 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2517 if let Some(coord) = &mut self.coordination {
2518 coord.state = CoordinationPhase::Punching;
2519
2520 let network_rtt = self.network_monitor.get_estimated_rtt()
2522 .unwrap_or(Duration::from_millis(100));
2523
2524 let jitter_ms: u64 = rand::random::<u64>() % 11;
2526 let jitter = Duration::from_millis(jitter_ms);
2527 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2528
2529 coord.punch_start = transmission_time.max(now);
2531
2532 trace!("Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2533 coord.punch_start, network_rtt, jitter);
2534 }
2535 }
2536
2537 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2539 self.coordination.as_ref().map(|c| c.punch_targets.as_slice())
2540 }
2541
2542
2543 pub(super) fn mark_coordination_validating(&mut self) {
2545 if let Some(coord) = &mut self.coordination {
2546 if coord.state == CoordinationPhase::Punching {
2547 coord.state = CoordinationPhase::Validating;
2548 trace!("Coordination moved to validation phase");
2549 }
2550 }
2551 }
2552
2553 pub(super) fn handle_coordination_success(&mut self, remote_addr: SocketAddr, now: Instant) -> bool {
2555 if let Some(coord) = &mut self.coordination {
2556 let was_target = coord.punch_targets.iter().any(|target| target.remote_addr == remote_addr);
2558
2559 if was_target && coord.state == CoordinationPhase::Validating {
2560 let rtt = now.duration_since(coord.round_start);
2562 coord.timeout_state.update_success(rtt);
2563 self.network_monitor.record_success(rtt, now);
2564
2565 coord.state = CoordinationPhase::Succeeded;
2566 self.stats.direct_connections += 1;
2567 trace!("Coordination succeeded via {} with RTT {:?}", remote_addr, rtt);
2568 true
2569 } else {
2570 false
2571 }
2572 } else {
2573 false
2574 }
2575 }
2576
2577 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2579 if let Some(coord) = &mut self.coordination {
2580 coord.retry_count += 1;
2581 coord.timeout_state.update_timeout();
2582 self.network_monitor.record_timeout(now);
2583
2584 if coord.timeout_state.should_retry(coord.max_retries)
2586 && self.network_monitor.is_suitable_for_coordination() {
2587
2588 coord.state = CoordinationPhase::Requesting;
2590 coord.punch_request_sent = false;
2591 coord.peer_punch_received = false;
2592 coord.round_start = now;
2593 coord.last_retry_at = Some(now);
2594
2595 let retry_delay = coord.timeout_state.get_retry_delay();
2597
2598 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2600 let adjusted_delay = Duration::from_millis(
2601 (retry_delay.as_millis() as f64 * quality_multiplier) as u64
2602 );
2603
2604 coord.punch_start = now + adjusted_delay;
2605
2606 trace!("Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2607 coord.round, coord.retry_count + 1, adjusted_delay, self.network_monitor.get_quality_score());
2608 true
2609 } else {
2610 coord.state = CoordinationPhase::Failed;
2611 self.stats.coordination_failures += 1;
2612
2613 if !self.network_monitor.is_suitable_for_coordination() {
2614 trace!("Coordination failed due to poor network conditions (quality: {:.2})",
2615 self.network_monitor.get_quality_score());
2616 } else {
2617 trace!("Coordination failed after {} attempts", coord.retry_count);
2618 }
2619 false
2620 }
2621 } else {
2622 false
2623 }
2624 }
2625
2626
2627 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2629 if let Some(coord) = &mut self.coordination {
2630 let timeout = coord.timeout_state.get_timeout();
2631 let elapsed = now.duration_since(coord.round_start);
2632
2633 if elapsed > timeout {
2634 trace!("Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2635 coord.round, elapsed, timeout);
2636 self.handle_coordination_failure(now);
2637 true
2638 } else {
2639 false
2640 }
2641 } else {
2642 false
2643 }
2644 }
2645
2646
2647 #[allow(dead_code)] pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2650 let mut expired_validations = Vec::new();
2651 let mut retry_validations = Vec::new();
2652
2653 for (addr, validation) in &mut self.active_validations {
2654 let timeout = validation.timeout_state.get_timeout();
2655 let elapsed = now.duration_since(validation.sent_at);
2656
2657 if elapsed >= timeout {
2658 if validation.timeout_state.should_retry(validation.max_retries) {
2659 retry_validations.push(*addr);
2661 } else {
2662 expired_validations.push(*addr);
2664 }
2665 }
2666 }
2667
2668 for addr in retry_validations {
2670 if let Some(validation) = self.active_validations.get_mut(&addr) {
2671 validation.retry_count += 1;
2672 validation.sent_at = now;
2673 validation.last_retry_at = Some(now);
2674 validation.timeout_state.update_timeout();
2675
2676 trace!("Retrying validation for {} (attempt {})", addr, validation.retry_count + 1);
2677 }
2678 }
2679
2680 for addr in &expired_validations {
2682 self.active_validations.remove(addr);
2683 self.network_monitor.record_timeout(now);
2684 trace!("Validation expired for {}", addr);
2685 }
2686
2687 expired_validations
2688 }
2689
2690 #[allow(dead_code)] pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2693 let mut retry_addresses = Vec::new();
2694
2695 for (addr, validation) in &mut self.active_validations {
2697 let elapsed = now.duration_since(validation.sent_at);
2698 let timeout = validation.timeout_state.get_timeout();
2699
2700 if elapsed > timeout && validation.timeout_state.should_retry(validation.max_retries) {
2701 validation.retry_count += 1;
2703 validation.last_retry_at = Some(now);
2704 validation.sent_at = now; validation.timeout_state.update_timeout();
2706
2707 retry_addresses.push(*addr);
2708 trace!("Scheduled retry {} for validation to {}", validation.retry_count, addr);
2709 }
2710 }
2711
2712 retry_addresses
2713 }
2714
2715
2716 #[allow(dead_code)] pub(super) fn update_network_conditions(&mut self, now: Instant) {
2719 self.network_monitor.cleanup(now);
2720
2721 let multiplier = self.network_monitor.get_timeout_multiplier();
2723
2724 for validation in self.active_validations.values_mut() {
2726 if multiplier > 1.5 {
2727 validation.timeout_state.backoff_multiplier =
2729 (validation.timeout_state.backoff_multiplier * 1.2).min(validation.timeout_state.max_backoff_multiplier);
2730 } else if multiplier < 0.8 {
2731 validation.timeout_state.backoff_multiplier =
2733 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2734 }
2735 }
2736 }
2737
2738
2739 #[allow(dead_code)] pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2742 if let Some(coord) = &self.coordination {
2743 if coord.retry_count > 0 {
2744 if let Some(last_retry) = coord.last_retry_at {
2745 let retry_delay = coord.timeout_state.get_retry_delay();
2746 return now.duration_since(last_retry) >= retry_delay;
2747 }
2748 }
2749 }
2750 false
2751 }
2752
2753
2754 #[allow(dead_code)] pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2757 self.resource_manager.update_stats(
2759 self.active_validations.len(),
2760 self.local_candidates.len(),
2761 self.remote_candidates.len(),
2762 self.candidate_pairs.len()
2763 );
2764
2765 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2767 self.active_validations.len(),
2768 self.local_candidates.len(),
2769 self.remote_candidates.len(),
2770 self.candidate_pairs.len()
2771 );
2772
2773 let mut cleaned = 0;
2775
2776 if self.resource_manager.should_cleanup(now) {
2777 cleaned += self.resource_manager.cleanup_expired_resources(
2778 &mut self.active_validations,
2779 &mut self.local_candidates,
2780 &mut self.remote_candidates,
2781 &mut self.candidate_pairs,
2782 &mut self.coordination,
2783 now
2784 );
2785
2786 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2788 cleaned += self.resource_manager.aggressive_cleanup(
2789 &mut self.active_validations,
2790 &mut self.local_candidates,
2791 &mut self.remote_candidates,
2792 &mut self.candidate_pairs,
2793 now
2794 );
2795 }
2796 }
2797
2798 cleaned
2799 }
2800
2801
2802 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2804 self.resource_manager.update_stats(
2806 self.active_validations.len(),
2807 self.local_candidates.len(),
2808 self.remote_candidates.len(),
2809 self.candidate_pairs.len()
2810 );
2811 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2812 self.active_validations.len(),
2813 self.local_candidates.len(),
2814 self.remote_candidates.len(),
2815 self.candidate_pairs.len()
2816 );
2817
2818 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2820 self.resource_manager.stats.allocation_failures += 1;
2821 return true;
2822 }
2823
2824 if self.resource_manager.check_resource_limits(self) {
2826 self.resource_manager.stats.allocation_failures += 1;
2827 return true;
2828 }
2829
2830 false
2831 }
2832
2833
2834 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2836 let mut next_timeout = None;
2837
2838 if let Some(coord) = &self.coordination {
2840 match coord.state {
2841 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2842 let timeout_at = coord.round_start + self.coordination_timeout;
2843 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2844 }
2845 CoordinationPhase::Preparing => {
2846 next_timeout = Some(next_timeout.map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)));
2848 }
2849 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2850 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2852 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2853 }
2854 _ => {}
2855 }
2856 }
2857
2858 for (_, validation) in &self.active_validations {
2860 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2861 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2862 }
2863
2864 if self.resource_manager.should_cleanup(now) {
2866 let cleanup_at = now + Duration::from_secs(1);
2868 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2869 }
2870
2871 next_timeout
2872 }
2873
2874 pub(super) fn handle_timeout(&mut self, now: Instant) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2876 let mut actions = Vec::new();
2877
2878 if let Some(coord) = &mut self.coordination {
2880 match coord.state {
2881 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2882 let timeout_at = coord.round_start + self.coordination_timeout;
2883 if now >= timeout_at {
2884 coord.retry_count += 1;
2885 if coord.retry_count >= coord.max_retries {
2886 debug!("Coordination failed after {} retries", coord.retry_count);
2887 coord.state = CoordinationPhase::Failed;
2888 actions.push(TimeoutAction::Failed);
2889 } else {
2890 debug!("Coordination timeout, retrying ({}/{})", coord.retry_count, coord.max_retries);
2891 coord.state = CoordinationPhase::Requesting;
2892 coord.round_start = now;
2893 actions.push(TimeoutAction::RetryCoordination);
2894 }
2895 }
2896 }
2897 CoordinationPhase::Preparing => {
2898 if now >= coord.punch_start {
2900 debug!("Starting coordinated hole punching");
2901 coord.state = CoordinationPhase::Punching;
2902 actions.push(TimeoutAction::StartValidation);
2903 }
2904 }
2905 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2906 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2907 if now >= timeout_at {
2908 coord.retry_count += 1;
2909 if coord.retry_count >= coord.max_retries {
2910 debug!("Validation failed after {} retries", coord.retry_count);
2911 coord.state = CoordinationPhase::Failed;
2912 actions.push(TimeoutAction::Failed);
2913 } else {
2914 debug!("Validation timeout, retrying ({}/{})", coord.retry_count, coord.max_retries);
2915 coord.state = CoordinationPhase::Punching;
2916 actions.push(TimeoutAction::StartValidation);
2917 }
2918 }
2919 }
2920 CoordinationPhase::Succeeded => {
2921 actions.push(TimeoutAction::Complete);
2922 }
2923 CoordinationPhase::Failed => {
2924 actions.push(TimeoutAction::Failed);
2925 }
2926 _ => {}
2927 }
2928 }
2929
2930 let mut expired_validations = Vec::new();
2932 for (addr, validation) in &mut self.active_validations {
2933 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2934 if now >= timeout_at {
2935 validation.retry_count += 1;
2936 if validation.retry_count >= validation.max_retries {
2937 debug!("Path validation failed for {}: max retries exceeded", addr);
2938 expired_validations.push(*addr);
2939 } else {
2940 debug!("Path validation timeout for {}, retrying ({}/{})",
2941 addr, validation.retry_count, validation.max_retries);
2942 validation.sent_at = now;
2943 validation.last_retry_at = Some(now);
2944 actions.push(TimeoutAction::StartValidation);
2945 }
2946 }
2947 }
2948
2949 for addr in expired_validations {
2951 self.active_validations.remove(&addr);
2952 }
2953
2954 if self.resource_manager.should_cleanup(now) {
2956 self.resource_manager.perform_cleanup(now);
2957 }
2958
2959 self.network_monitor.update_quality_score(now);
2961
2962 if self.coordination.is_none() && !self.local_candidates.is_empty() && !self.remote_candidates.is_empty() {
2964 actions.push(TimeoutAction::RetryDiscovery);
2965 }
2966
2967 Ok(actions)
2968 }
2969
2970 #[allow(dead_code)] pub(super) fn handle_address_observation(
2976 &mut self,
2977 peer_id: [u8; 32],
2978 observed_address: SocketAddr,
2979 connection_id: crate::shared::ConnectionId,
2980 peer_role: NatTraversalRole,
2981 now: Instant,
2982 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
2983 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
2984 let connection_context = ConnectionContext {
2985 connection_id,
2986 original_destination: observed_address, peer_role,
2988 transport_params: None,
2989 };
2990
2991 bootstrap_coordinator.observe_peer_address(
2993 peer_id,
2994 observed_address,
2995 connection_context,
2996 now,
2997 )?;
2998
2999 let sequence = self.next_sequence;
3001 self.next_sequence = VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3002
3003 let priority = VarInt::from_u32(100); let add_address_frame = bootstrap_coordinator.generate_add_address_frame(
3005 peer_id,
3006 sequence,
3007 priority,
3008 );
3009
3010 Ok(add_address_frame)
3011 } else {
3012 Ok(None)
3014 }
3015 }
3016
3017 pub(super) fn handle_punch_me_now_frame(
3022 &mut self,
3023 from_peer: [u8; 32],
3024 source_addr: SocketAddr,
3025 frame: &crate::frame::PunchMeNow,
3026 now: Instant,
3027 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3028 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3029 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3030 } else {
3031 Ok(None)
3033 }
3034 }
3035
3036 #[allow(dead_code)] pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3041 self.bootstrap_coordinator
3042 .as_ref()
3043 .and_then(|coord| coord.get_peer_record(peer_id))
3044 .map(|record| record.observed_address)
3045 }
3046
3047 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3049 debug!("Starting candidate discovery for NAT traversal");
3050
3051 if self.local_candidates.is_empty() {
3053 debug!("Local candidates will be populated by discovery manager");
3056 }
3057
3058 Ok(())
3059 }
3060
3061 #[allow(dead_code)] pub(super) fn queue_add_address_frame(
3064 &mut self,
3065 sequence: VarInt,
3066 address: SocketAddr,
3067 priority: u32,
3068 ) -> Result<(), NatTraversalError> {
3069 debug!("Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3070 sequence, address, priority);
3071
3072 let candidate = AddressCandidate {
3074 address,
3075 priority,
3076 source: CandidateSource::Local,
3077 discovered_at: Instant::now(),
3078 state: CandidateState::New,
3079 attempt_count: 0,
3080 last_attempt: None,
3081 };
3082
3083 if !self.local_candidates.values().any(|c| c.address == address) {
3085 self.local_candidates.insert(sequence, candidate);
3086 }
3087
3088 Ok(())
3089 }
3090}
3091
3092#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3094#[allow(dead_code)] pub(crate) enum NatTraversalError {
3096 TooManyCandidates,
3098 DuplicateAddress,
3100 UnknownCandidate,
3102 InvalidCandidateState,
3104 NoActiveValidation,
3106 ChallengeMismatch,
3108 NoActiveCoordination,
3110 SecurityValidationFailed,
3112 RateLimitExceeded,
3114 InvalidAddress,
3116 SuspiciousCoordination,
3118 ResourceLimitExceeded,
3120}
3121
3122impl std::fmt::Display for NatTraversalError {
3123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3124 match self {
3125 Self::TooManyCandidates => write!(f, "too many candidates"),
3126 Self::DuplicateAddress => write!(f, "duplicate address"),
3127 Self::UnknownCandidate => write!(f, "unknown candidate"),
3128 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3129 Self::NoActiveValidation => write!(f, "no active validation"),
3130 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3131 Self::NoActiveCoordination => write!(f, "no active coordination"),
3132 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3133 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3134 Self::InvalidAddress => write!(f, "invalid address"),
3135 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3136 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3137 }
3138 }
3139}
3140
3141impl std::error::Error for NatTraversalError {}
3142
3143#[derive(Debug, Clone)]
3145#[allow(dead_code)] pub(crate) struct SecurityStats {
3147 pub total_security_rejections: u32,
3149 pub rate_limit_violations: u32,
3151 pub invalid_address_rejections: u32,
3153 pub suspicious_coordination_attempts: u32,
3155 pub active_validations: usize,
3157 pub cached_address_validations: usize,
3159 pub current_candidate_rate: usize,
3161 pub current_coordination_rate: usize,
3163}
3164
3165#[derive(Debug)]
3170pub(crate) struct BootstrapCoordinator {
3171 peer_registry: HashMap<PeerId, PeerObservationRecord>,
3173 coordination_sessions: HashMap<CoordinationSessionId, CoordinationSession>,
3175 #[allow(dead_code)] pending_coordination: VecDeque<PendingCoordinationRequest>,
3178 #[allow(dead_code)] address_observations: HashMap<SocketAddr, AddressObservation>,
3181 security_validator: SecurityValidationState,
3183 stats: BootstrapStats,
3185 _config: BootstrapConfig,
3187 _last_cleanup: Option<Instant>,
3189}
3190
3191type CoordinationSessionId = u64;
3193
3194type PeerId = [u8; 32];
3196
3197#[derive(Debug, Clone)]
3199pub(crate) struct PeerObservationRecord {
3200 #[allow(dead_code)] peer_id: PeerId,
3203 observed_address: SocketAddr,
3205 #[allow(dead_code)] observed_at: Instant,
3208 #[allow(dead_code)] connection_context: ConnectionContext,
3211 #[allow(dead_code)] can_coordinate: bool,
3214 #[allow(dead_code)] coordination_count: u32,
3217 #[allow(dead_code)] success_rate: f64,
3220}
3221
3222#[derive(Debug, Clone)]
3224pub(crate) struct ConnectionContext {
3225 #[allow(dead_code)] connection_id: ConnectionId,
3228 #[allow(dead_code)] original_destination: SocketAddr,
3231 #[allow(dead_code)] peer_role: NatTraversalRole,
3234 #[allow(dead_code)] transport_params: Option<NatTraversalTransportParams>,
3237}
3238
3239#[derive(Debug, Clone)]
3241struct NatTraversalTransportParams {
3242 #[allow(dead_code)] max_candidates: u32,
3245 #[allow(dead_code)] coordination_timeout: Duration,
3248 #[allow(dead_code)] supports_advanced_features: bool,
3251}
3252
3253#[derive(Debug, Clone)]
3255struct AddressObservation {
3256 #[allow(dead_code)] address: SocketAddr,
3259 #[allow(dead_code)] first_observed: Instant,
3262 #[allow(dead_code)] observation_count: u32,
3265 #[allow(dead_code)] validation_state: AddressValidationResult,
3268 #[allow(dead_code)] associated_peers: Vec<PeerId>,
3271}
3272
3273#[derive(Debug, Clone)]
3275#[allow(dead_code)] pub(crate) struct CoordinationSession {
3277 session_id: CoordinationSessionId,
3279 peer_a: PeerId,
3281 peer_b: PeerId,
3283 current_round: VarInt,
3285 started_at: Instant,
3287 phase: CoordinationPhase,
3289 target_addresses: Vec<(SocketAddr, VarInt)>, sync_state: SynchronizationState,
3293 stats: CoordinationSessionStats,
3295}
3296
3297#[derive(Debug, Clone)]
3299struct SynchronizationState {
3300 peer_a_ready: bool,
3302 peer_b_ready: bool,
3304}
3305
3306#[derive(Debug, Clone, Default)]
3308struct CoordinationSessionStats {
3309 successful_coordinations: u32,
3311}
3312
3313#[derive(Debug, Clone)]
3315struct PendingCoordinationRequest {
3316 _unused: (),
3317}
3318
3319#[derive(Debug, Clone)]
3321pub(crate) struct BootstrapConfig {
3322 _unused: (),
3323}
3324
3325#[derive(Debug, Clone, Default)]
3327pub(crate) struct BootstrapStats {
3328 #[allow(dead_code)] total_observations: u64,
3331 total_coordinations: u64,
3333 successful_coordinations: u64,
3335 #[allow(dead_code)] active_peers: usize,
3338 active_sessions: usize,
3340 security_rejections: u64,
3342}
3343
3344#[derive(Debug, Clone)]
3346#[allow(dead_code)] pub(crate) enum CoordinationSessionEvent {
3348 PhaseChanged {
3350 session_id: CoordinationSessionId,
3351 old_phase: CoordinationPhase,
3352 new_phase: CoordinationPhase,
3353 },
3354 SessionFailed {
3356 session_id: CoordinationSessionId,
3357 peer_a: PeerId,
3358 peer_b: PeerId,
3359 reason: String,
3360 },
3361 StartHolePunching {
3363 session_id: CoordinationSessionId,
3364 peer_a: PeerId,
3365 peer_b: PeerId,
3366 target_addresses: Vec<(SocketAddr, VarInt)>,
3367 },
3368 ReadyForCleanup {
3370 session_id: CoordinationSessionId,
3371 },
3372}
3373
3374#[derive(Debug, Clone, Copy)]
3376#[allow(dead_code)] enum SessionAdvancementEvent {
3378 BothPeersReady,
3380 CoordinationComplete,
3382 PreparationComplete,
3384 PunchingComplete,
3386 ValidationTimeout,
3388 Timeout,
3390 ReadyForCleanup,
3392}
3393
3394#[derive(Debug, Clone, Copy)]
3396#[allow(dead_code)] pub(crate) enum CoordinationRecoveryAction {
3398 NoAction,
3400 RetryWithBackoff,
3402 MarkAsFailed,
3404 Cleanup,
3406}
3407
3408impl BootstrapCoordinator {
3409 pub(crate) fn new(config: BootstrapConfig) -> Self {
3411 Self {
3412 peer_registry: HashMap::new(),
3413 coordination_sessions: HashMap::new(),
3414 pending_coordination: VecDeque::new(),
3415 address_observations: HashMap::new(),
3416 security_validator: SecurityValidationState::new(),
3417 stats: BootstrapStats::default(),
3418 _config: config,
3419 _last_cleanup: None,
3420 }
3421 }
3422
3423 #[allow(dead_code)] pub(crate) fn observe_peer_address(
3429 &mut self,
3430 peer_id: PeerId,
3431 observed_address: SocketAddr,
3432 connection_context: ConnectionContext,
3433 now: Instant,
3434 ) -> Result<(), NatTraversalError> {
3435 match self.security_validator.validate_address(observed_address, now) {
3437 AddressValidationResult::Valid => {},
3438 AddressValidationResult::Invalid => {
3439 self.stats.security_rejections += 1;
3440 return Err(NatTraversalError::InvalidAddress);
3441 }
3442 AddressValidationResult::Suspicious => {
3443 self.stats.security_rejections += 1;
3444 return Err(NatTraversalError::SecurityValidationFailed);
3445 }
3446 }
3447
3448 if self.security_validator.is_candidate_rate_limited(now) {
3450 self.stats.security_rejections += 1;
3451 return Err(NatTraversalError::RateLimitExceeded);
3452 }
3453
3454 let observation = self.address_observations.entry(observed_address)
3456 .or_insert_with(|| AddressObservation {
3457 address: observed_address,
3458 first_observed: now,
3459 observation_count: 0,
3460 validation_state: AddressValidationResult::Valid,
3461 associated_peers: Vec::new(),
3462 });
3463
3464 observation.observation_count += 1;
3465 if !observation.associated_peers.contains(&peer_id) {
3466 observation.associated_peers.push(peer_id);
3467 }
3468
3469 let peer_record = PeerObservationRecord {
3471 peer_id,
3472 observed_address,
3473 observed_at: now,
3474 connection_context,
3475 can_coordinate: true, coordination_count: 0,
3477 success_rate: 1.0,
3478 };
3479
3480 self.peer_registry.insert(peer_id, peer_record);
3481 self.stats.total_observations += 1;
3482 self.stats.active_peers = self.peer_registry.len();
3483
3484 debug!("Observed peer {:?} at address {} (total observations: {})",
3485 peer_id, observed_address, self.stats.total_observations);
3486
3487 Ok(())
3488 }
3489
3490 #[allow(dead_code)] pub(crate) fn generate_add_address_frame(
3496 &self,
3497 peer_id: PeerId,
3498 sequence: VarInt,
3499 priority: VarInt,
3500 ) -> Option<crate::frame::AddAddress> {
3501 if let Some(peer_record) = self.peer_registry.get(&peer_id) {
3502 Some(crate::frame::AddAddress {
3503 sequence,
3504 address: peer_record.observed_address,
3505 priority,
3506 })
3507 } else {
3508 None
3509 }
3510 }
3511
3512 pub(crate) fn process_punch_me_now_frame(
3517 &mut self,
3518 from_peer: PeerId,
3519 source_addr: SocketAddr,
3520 frame: &crate::frame::PunchMeNow,
3521 now: Instant,
3522 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3523 if self.security_validator.is_adaptive_rate_limited(from_peer, now) {
3525 self.stats.security_rejections += 1;
3526 debug!("PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3527 hex::encode(&from_peer[..8]));
3528 return Err(NatTraversalError::RateLimitExceeded);
3529 }
3530
3531 self.security_validator.enhanced_address_validation(frame.local_address, source_addr, now)
3533 .map_err(|e| {
3534 self.stats.security_rejections += 1;
3535 debug!("PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3536 hex::encode(&from_peer[..8]), e);
3537 e
3538 })?;
3539
3540 self.security_validator.validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3542 .map_err(|e| {
3543 self.stats.security_rejections += 1;
3544 debug!("PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3545 hex::encode(&from_peer[..8]), e);
3546 e
3547 })?;
3548
3549 if let Some(target_peer_id) = frame.target_peer_id {
3551 if let Some(target_peer) = self.peer_registry.get(&target_peer_id) {
3553 let session_id = self.generate_session_id();
3555
3556 if !self.coordination_sessions.contains_key(&session_id) {
3557 let _network_rtt = self.estimate_peer_rtt(&from_peer)
3559 .unwrap_or(Duration::from_millis(100));
3560
3561 let session = CoordinationSession {
3562 session_id,
3563 peer_a: from_peer,
3564 peer_b: target_peer_id,
3565 current_round: frame.round,
3566 started_at: now,
3567 phase: CoordinationPhase::Requesting,
3568 target_addresses: vec![(frame.local_address, frame.target_sequence)],
3569 sync_state: SynchronizationState {
3570 peer_a_ready: true, peer_b_ready: false,
3572 },
3573 stats: CoordinationSessionStats::default(),
3574 };
3575
3576 self.coordination_sessions.insert(session_id, session);
3577 self.stats.total_coordinations += 1;
3578 self.stats.active_sessions = self.coordination_sessions.len();
3579 }
3580
3581 let coordination_frame = crate::frame::PunchMeNow {
3583 round: frame.round,
3584 target_sequence: frame.target_sequence,
3585 local_address: target_peer.observed_address,
3586 target_peer_id: Some(from_peer),
3587 };
3588
3589 info!("Coordinating hole punch between {:?} and {:?} (round: {})",
3590 from_peer, target_peer_id, frame.round);
3591
3592 Ok(Some(coordination_frame))
3593 } else {
3594 warn!("Target peer {:?} not found for coordination from {:?}",
3596 target_peer_id, from_peer);
3597 Ok(None)
3598 }
3599 } else {
3600 let session_id = if let Some(session) = self.find_coordination_session_by_peer(from_peer, frame.round) {
3602 session.sync_state.peer_b_ready = true;
3603
3604 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3606 session.phase = CoordinationPhase::Punching;
3607 session.stats.successful_coordinations += 1;
3608 Some(session.session_id)
3609 } else {
3610 None
3611 }
3612 } else {
3613 None
3614 };
3615
3616 if let Some(session_id) = session_id {
3618 self.stats.successful_coordinations += 1;
3619 info!("Coordination complete for session {} (round: {})",
3620 session_id, frame.round);
3621 }
3622
3623 Ok(None)
3624 }
3625 }
3626
3627 fn find_coordination_session_by_peer(
3629 &mut self,
3630 peer_id: PeerId,
3631 round: VarInt,
3632 ) -> Option<&mut CoordinationSession> {
3633 self.coordination_sessions.values_mut().find(|session| {
3634 (session.peer_a == peer_id || session.peer_b == peer_id) &&
3635 session.current_round == round
3636 })
3637 }
3638
3639 fn generate_session_id(&self) -> CoordinationSessionId {
3641 rand::random()
3642 }
3643
3644 #[allow(dead_code)] pub(crate) fn generate_secure_coordination_round(&self) -> VarInt {
3647 self.security_validator.generate_secure_coordination_round()
3648 }
3649
3650 #[allow(dead_code)] pub(crate) fn validate_coordination_security(
3653 &mut self,
3654 peer_id: PeerId,
3655 source_addr: SocketAddr,
3656 target_addr: SocketAddr,
3657 now: Instant,
3658 ) -> Result<(), NatTraversalError> {
3659 if self.security_validator.is_adaptive_rate_limited(peer_id, now) {
3661 self.stats.security_rejections += 1;
3662 return Err(NatTraversalError::RateLimitExceeded);
3663 }
3664
3665 self.security_validator.enhanced_address_validation(target_addr, source_addr, now)?;
3667
3668 self.security_validator.validate_amplification_limits(source_addr, target_addr, now)?;
3670
3671 Ok(())
3672 }
3673
3674 #[allow(dead_code)] pub(crate) fn cleanup_expired_sessions(&mut self, now: Instant) {
3677 let session_timeout = Duration::from_secs(300); let expired_sessions: Vec<CoordinationSessionId> = self.coordination_sessions
3681 .iter()
3682 .filter(|(_, session)| {
3683 now.duration_since(session.started_at) > session_timeout
3684 })
3685 .map(|(&session_id, _)| session_id)
3686 .collect();
3687
3688 for session_id in expired_sessions {
3690 if let Some(session) = self.coordination_sessions.remove(&session_id) {
3691 debug!("Cleaned up expired coordination session {} between {:?} and {:?}",
3692 session_id, hex::encode(&session.peer_a[..8]), hex::encode(&session.peer_b[..8]));
3693 }
3694 }
3695
3696 self.stats.active_sessions = self.coordination_sessions.len();
3698
3699 let observation_timeout = Duration::from_secs(3600); self.peer_registry.retain(|_, record| {
3702 now.duration_since(record.observed_at) <= observation_timeout
3703 });
3704
3705 self.stats.active_peers = self.peer_registry.len();
3707
3708 self.address_observations.retain(|_, observation| {
3710 now.duration_since(observation.first_observed) <= observation_timeout
3711 });
3712 }
3713
3714 #[allow(dead_code)] pub(crate) fn get_stats(&self) -> &BootstrapStats {
3717 &self.stats
3718 }
3719
3720 #[allow(dead_code)] pub(crate) fn update_peer_coordination_stats(
3723 &mut self,
3724 peer_id: PeerId,
3725 success: bool,
3726 ) {
3727 if let Some(peer_record) = self.peer_registry.get_mut(&peer_id) {
3728 peer_record.coordination_count += 1;
3729
3730 if success {
3731 let alpha = 0.1; peer_record.success_rate = peer_record.success_rate * (1.0 - alpha) + alpha;
3734 } else {
3735 let alpha = 0.1;
3737 peer_record.success_rate = peer_record.success_rate * (1.0 - alpha);
3738 }
3739
3740 if peer_record.success_rate < 0.1 && peer_record.coordination_count > 10 {
3742 peer_record.can_coordinate = false;
3743 warn!("Disabled coordination for peer {:?} due to low success rate: {:.2}",
3744 hex::encode(&peer_id[..8]), peer_record.success_rate);
3745 }
3746 }
3747 }
3748
3749 #[allow(dead_code)] pub(crate) fn poll_session_state_machine(&mut self, now: Instant) -> Vec<CoordinationSessionEvent> {
3755 let mut events = Vec::new();
3756 let mut sessions_to_update = Vec::new();
3757
3758 for (&session_id, session) in &self.coordination_sessions {
3760 if let Some(event) = self.should_advance_session(session, now) {
3761 sessions_to_update.push((session_id, event));
3762 }
3763 }
3764
3765 for (session_id, event) in sessions_to_update {
3767 let session_events = if let Some(session) = self.coordination_sessions.get_mut(&session_id) {
3768 let peer_a = session.peer_a;
3769 let peer_b = session.peer_b;
3770
3771 match Self::advance_session_state_static(session, event, now) {
3772 Ok(session_events) => session_events,
3773 Err(e) => {
3774 warn!("Failed to advance session {} state: {:?}", session_id, e);
3775 session.phase = CoordinationPhase::Failed;
3777 vec![CoordinationSessionEvent::SessionFailed {
3778 session_id,
3779 peer_a,
3780 peer_b,
3781 reason: format!("State advancement error: {:?}", e),
3782 }]
3783 }
3784 }
3785 } else {
3786 Vec::new()
3787 };
3788
3789 events.extend(session_events);
3790 }
3791
3792 self.cleanup_completed_sessions(now);
3794
3795 events
3796 }
3797
3798 #[allow(dead_code)] fn should_advance_session(&self, session: &CoordinationSession, now: Instant) -> Option<SessionAdvancementEvent> {
3801 let session_age = now.duration_since(session.started_at);
3802
3803 match session.phase {
3804 CoordinationPhase::Requesting => {
3805 if session_age > Duration::from_secs(10) {
3807 Some(SessionAdvancementEvent::Timeout)
3808 } else if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3809 Some(SessionAdvancementEvent::BothPeersReady)
3810 } else {
3811 None
3812 }
3813 }
3814 CoordinationPhase::Coordinating => {
3815 if session_age > Duration::from_millis(500) {
3817 Some(SessionAdvancementEvent::CoordinationComplete)
3818 } else {
3819 None
3820 }
3821 }
3822 CoordinationPhase::Preparing => {
3823 if session_age > Duration::from_secs(1) {
3825 Some(SessionAdvancementEvent::PreparationComplete)
3826 } else {
3827 None
3828 }
3829 }
3830 CoordinationPhase::Punching => {
3831 if session_age > Duration::from_secs(2) {
3833 Some(SessionAdvancementEvent::PunchingComplete)
3834 } else {
3835 None
3836 }
3837 }
3838 CoordinationPhase::Validating => {
3839 if session_age > Duration::from_secs(10) {
3841 Some(SessionAdvancementEvent::ValidationTimeout)
3842 } else {
3843 None
3844 }
3845 }
3846 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
3847 if session_age > Duration::from_secs(60) {
3849 Some(SessionAdvancementEvent::ReadyForCleanup)
3850 } else {
3851 None
3852 }
3853 }
3854 CoordinationPhase::Idle => {
3855 Some(SessionAdvancementEvent::Timeout)
3857 }
3858 }
3859 }
3860
3861 #[allow(dead_code)] fn advance_session_state_static(
3864 session: &mut CoordinationSession,
3865 event: SessionAdvancementEvent,
3866 _now: Instant,
3867 ) -> Result<Vec<CoordinationSessionEvent>, NatTraversalError> {
3868 let mut events = Vec::new();
3869 let previous_phase = session.phase;
3870
3871 match (session.phase, event) {
3872 (CoordinationPhase::Requesting, SessionAdvancementEvent::BothPeersReady) => {
3873 session.phase = CoordinationPhase::Coordinating;
3874 debug!("Session {} advanced from Requesting to Coordinating", session.session_id);
3875 events.push(CoordinationSessionEvent::PhaseChanged {
3876 session_id: session.session_id,
3877 old_phase: previous_phase,
3878 new_phase: session.phase,
3879 });
3880 }
3881 (CoordinationPhase::Requesting, SessionAdvancementEvent::Timeout) => {
3882 session.phase = CoordinationPhase::Failed;
3883 warn!("Session {} timed out in Requesting phase", session.session_id);
3884 events.push(CoordinationSessionEvent::SessionFailed {
3885 session_id: session.session_id,
3886 peer_a: session.peer_a,
3887 peer_b: session.peer_b,
3888 reason: "Timeout waiting for peer responses".to_string(),
3889 });
3890 }
3891 (CoordinationPhase::Coordinating, SessionAdvancementEvent::CoordinationComplete) => {
3892 session.phase = CoordinationPhase::Preparing;
3893 debug!("Session {} advanced from Coordinating to Preparing", session.session_id);
3894 events.push(CoordinationSessionEvent::PhaseChanged {
3895 session_id: session.session_id,
3896 old_phase: previous_phase,
3897 new_phase: session.phase,
3898 });
3899 }
3900 (CoordinationPhase::Preparing, SessionAdvancementEvent::PreparationComplete) => {
3901 session.phase = CoordinationPhase::Punching;
3902 debug!("Session {} advanced from Preparing to Punching", session.session_id);
3903 events.push(CoordinationSessionEvent::PhaseChanged {
3904 session_id: session.session_id,
3905 old_phase: previous_phase,
3906 new_phase: session.phase,
3907 });
3908 events.push(CoordinationSessionEvent::StartHolePunching {
3909 session_id: session.session_id,
3910 peer_a: session.peer_a,
3911 peer_b: session.peer_b,
3912 target_addresses: session.target_addresses.clone(),
3913 });
3914 }
3915 (CoordinationPhase::Punching, SessionAdvancementEvent::PunchingComplete) => {
3916 session.phase = CoordinationPhase::Validating;
3917 debug!("Session {} advanced from Punching to Validating", session.session_id);
3918 events.push(CoordinationSessionEvent::PhaseChanged {
3919 session_id: session.session_id,
3920 old_phase: previous_phase,
3921 new_phase: session.phase,
3922 });
3923 }
3924 (CoordinationPhase::Validating, SessionAdvancementEvent::ValidationTimeout) => {
3925 session.phase = CoordinationPhase::Failed;
3926 warn!("Session {} validation timed out", session.session_id);
3927 events.push(CoordinationSessionEvent::SessionFailed {
3928 session_id: session.session_id,
3929 peer_a: session.peer_a,
3930 peer_b: session.peer_b,
3931 reason: "Validation timeout".to_string(),
3932 });
3933 }
3934 (phase, SessionAdvancementEvent::ReadyForCleanup) => {
3935 debug!("Session {} ready for cleanup in phase {:?}", session.session_id, phase);
3936 events.push(CoordinationSessionEvent::ReadyForCleanup {
3937 session_id: session.session_id,
3938 });
3939 }
3940 _ => {
3941 warn!("Invalid state transition for session {}: {:?} -> {:?}",
3943 session.session_id, session.phase, event);
3944 }
3945 }
3946
3947 Ok(events)
3948 }
3949
3950 #[allow(dead_code)] fn cleanup_completed_sessions(&mut self, now: Instant) {
3953 let cleanup_timeout = Duration::from_secs(300); let sessions_to_remove: Vec<CoordinationSessionId> = self.coordination_sessions
3956 .iter()
3957 .filter(|(_, session)| {
3958 matches!(session.phase, CoordinationPhase::Succeeded | CoordinationPhase::Failed) &&
3959 now.duration_since(session.started_at) > cleanup_timeout
3960 })
3961 .map(|(&session_id, _)| session_id)
3962 .collect();
3963
3964 for session_id in sessions_to_remove {
3965 if let Some(session) = self.coordination_sessions.remove(&session_id) {
3966 debug!("Cleaned up completed session {} in phase {:?}",
3967 session_id, session.phase);
3968 }
3969 }
3970
3971 self.stats.active_sessions = self.coordination_sessions.len();
3972 }
3973
3974 #[allow(dead_code)] pub(crate) fn retry_failed_coordination(
3980 &mut self,
3981 session_id: CoordinationSessionId,
3982 now: Instant,
3983 ) -> Result<bool, NatTraversalError> {
3984 let session = self.coordination_sessions.get_mut(&session_id)
3985 .ok_or(NatTraversalError::NoActiveCoordination)?;
3986
3987 if !matches!(session.phase, CoordinationPhase::Failed) {
3989 return Ok(false);
3990 }
3991
3992 let base_delay = Duration::from_secs(1);
3994 let max_delay = Duration::from_secs(60);
3995 let retry_count = session.stats.successful_coordinations; let delay = std::cmp::min(
3998 base_delay * 2_u32.pow(retry_count.min(10)), max_delay
4000 );
4001
4002 let _jitter_factor = 0.1;
4004 let jitter = Duration::from_millis((rand::random::<u64>() % 100) * delay.as_millis() as u64 / 1000);
4005 let total_delay = delay + jitter;
4006
4007 if now.duration_since(session.started_at) < total_delay {
4009 return Ok(false);
4010 }
4011
4012 const MAX_RETRIES: u32 = 5;
4014 if retry_count >= MAX_RETRIES {
4015 warn!("Session {} exceeded maximum retry attempts ({})", session_id, MAX_RETRIES);
4016 return Ok(false);
4017 }
4018
4019 session.phase = CoordinationPhase::Requesting;
4021 session.started_at = now;
4022 session.sync_state.peer_a_ready = false;
4023 session.sync_state.peer_b_ready = false;
4024 session.stats.successful_coordinations += 1; info!("Retrying coordination session {} (attempt {})", session_id, retry_count + 1);
4027 Ok(true)
4028 }
4029
4030 #[allow(dead_code)] pub(crate) fn handle_coordination_error(
4033 &mut self,
4034 session_id: CoordinationSessionId,
4035 error: NatTraversalError,
4036 _now: Instant,
4037 ) -> CoordinationRecoveryAction {
4038 let session = match self.coordination_sessions.get_mut(&session_id) {
4039 Some(session) => session,
4040 None => return CoordinationRecoveryAction::NoAction,
4041 };
4042
4043 match error {
4044 NatTraversalError::RateLimitExceeded => {
4045 warn!("Rate limit exceeded for session {}, will retry", session_id);
4047 CoordinationRecoveryAction::RetryWithBackoff
4048 }
4049 NatTraversalError::SecurityValidationFailed | NatTraversalError::SuspiciousCoordination => {
4050 session.phase = CoordinationPhase::Failed;
4052 warn!("Security validation failed for session {}, marking as failed", session_id);
4053 CoordinationRecoveryAction::MarkAsFailed
4054 }
4055 NatTraversalError::InvalidAddress => {
4056 warn!("Invalid address in session {}, allowing retry", session_id);
4058 CoordinationRecoveryAction::RetryWithBackoff
4059 }
4060 NatTraversalError::NoActiveCoordination => {
4061 warn!("No active coordination for session {}, cleaning up", session_id);
4063 CoordinationRecoveryAction::Cleanup
4064 }
4065 _ => {
4066 warn!("Coordination error for session {}: {:?}, will retry", session_id, error);
4068 CoordinationRecoveryAction::RetryWithBackoff
4069 }
4070 }
4071 }
4072
4073 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
4075 if let Some(_peer_record) = self.peer_registry.get(peer_id) {
4078 Some(Duration::from_millis(100))
4080 } else {
4081 None
4082 }
4083 }
4084
4085
4086 #[allow(dead_code)] pub(crate) fn coordinate_hole_punching(
4092 &mut self,
4093 peer_a: PeerId,
4094 peer_b: PeerId,
4095 round: VarInt,
4096 now: Instant,
4097 ) -> Result<CoordinationSessionId, NatTraversalError> {
4098 let peer_a_record = self.peer_registry.get(&peer_a)
4100 .ok_or(NatTraversalError::UnknownCandidate)?;
4101 let peer_b_record = self.peer_registry.get(&peer_b)
4102 .ok_or(NatTraversalError::UnknownCandidate)?;
4103
4104 if !peer_a_record.can_coordinate || !peer_b_record.can_coordinate {
4105 return Err(NatTraversalError::InvalidCandidateState);
4106 }
4107
4108 let session_id = self.generate_session_id();
4110
4111 let session = CoordinationSession {
4113 session_id,
4114 peer_a,
4115 peer_b,
4116 current_round: round,
4117 started_at: now,
4118 phase: CoordinationPhase::Requesting,
4119 target_addresses: vec![
4120 (peer_a_record.observed_address, VarInt::from_u32(0)),
4121 (peer_b_record.observed_address, VarInt::from_u32(1)),
4122 ],
4123 sync_state: SynchronizationState {
4124 peer_a_ready: false,
4125 peer_b_ready: false,
4126 },
4127 stats: CoordinationSessionStats::default(),
4128 };
4129
4130 self.coordination_sessions.insert(session_id, session);
4131 self.stats.total_coordinations += 1;
4132 self.stats.active_sessions = self.coordination_sessions.len();
4133
4134 info!("Started coordination session {} between peers {:?} and {:?} (round: {})",
4135 session_id, hex::encode(&peer_a[..8]), hex::encode(&peer_b[..8]), round);
4136
4137 Ok(session_id)
4138 }
4139
4140 #[allow(dead_code)] pub(crate) fn relay_coordination_frame(
4146 &mut self,
4147 session_id: CoordinationSessionId,
4148 from_peer: PeerId,
4149 frame: &crate::frame::PunchMeNow,
4150 _now: Instant,
4151 ) -> Result<Option<(PeerId, crate::frame::PunchMeNow)>, NatTraversalError> {
4152 let session = self.coordination_sessions.get_mut(&session_id)
4153 .ok_or(NatTraversalError::NoActiveCoordination)?;
4154
4155 if session.peer_a != from_peer && session.peer_b != from_peer {
4157 return Err(NatTraversalError::SuspiciousCoordination);
4158 }
4159
4160 let target_peer = if session.peer_a == from_peer {
4162 session.peer_b
4163 } else {
4164 session.peer_a
4165 };
4166
4167 let target_record = self.peer_registry.get(&target_peer)
4169 .ok_or(NatTraversalError::UnknownCandidate)?;
4170
4171 if session.peer_a == from_peer {
4173 session.sync_state.peer_a_ready = true;
4174 } else {
4175 session.sync_state.peer_b_ready = true;
4176 }
4177
4178 let relay_frame = crate::frame::PunchMeNow {
4180 round: frame.round,
4181 target_sequence: frame.target_sequence,
4182 local_address: target_record.observed_address,
4183 target_peer_id: Some(from_peer),
4184 };
4185
4186 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4188 session.phase = CoordinationPhase::Coordinating;
4189 info!("Coordination phase complete for session {} - both peers ready", session_id);
4190 }
4191
4192 debug!("Relaying coordination frame from {:?} to {:?} in session {}",
4193 hex::encode(&from_peer[..8]), hex::encode(&target_peer[..8]), session_id);
4194
4195 Ok(Some((target_peer, relay_frame)))
4196 }
4197
4198 #[allow(dead_code)] pub(crate) fn advance_coordination_round(
4204 &mut self,
4205 session_id: CoordinationSessionId,
4206 now: Instant,
4207 ) -> Result<CoordinationPhase, NatTraversalError> {
4208 let session = self.coordination_sessions.get_mut(&session_id)
4209 .ok_or(NatTraversalError::NoActiveCoordination)?;
4210
4211 let previous_phase = session.phase;
4212
4213 match session.phase {
4215 CoordinationPhase::Requesting => {
4216 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4218 session.phase = CoordinationPhase::Coordinating;
4219 debug!("Session {} advanced to Coordinating phase", session_id);
4220 }
4221 }
4222 CoordinationPhase::Coordinating => {
4223 let coordination_delay = Duration::from_millis(200); let punch_time = now + coordination_delay;
4226
4227 session.phase = CoordinationPhase::Preparing;
4228 debug!("Session {} advanced to Preparing phase, punch time: {:?}",
4229 session_id, punch_time);
4230 }
4231 CoordinationPhase::Preparing => {
4232 session.phase = CoordinationPhase::Punching;
4234 debug!("Session {} advanced to Punching phase", session_id);
4235 }
4236 CoordinationPhase::Punching => {
4237 session.phase = CoordinationPhase::Validating;
4239 debug!("Session {} advanced to Validating phase", session_id);
4240 }
4241 CoordinationPhase::Validating => {
4242 let validation_timeout = Duration::from_secs(5);
4244 if now.duration_since(session.started_at) > validation_timeout {
4245 session.phase = CoordinationPhase::Failed;
4246 debug!("Session {} timed out in validation", session_id);
4247 }
4248 }
4249 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4250 }
4252 CoordinationPhase::Idle => {
4253 session.phase = CoordinationPhase::Requesting;
4255 }
4256 }
4257
4258 if session.phase != previous_phase {
4260 match session.phase {
4261 CoordinationPhase::Succeeded => {
4262 session.stats.successful_coordinations += 1;
4263 self.stats.successful_coordinations += 1;
4264 }
4265 CoordinationPhase::Failed => {
4266 }
4268 _ => {}
4269 }
4270 }
4271
4272 Ok(session.phase)
4273 }
4274
4275 #[allow(dead_code)] pub(crate) fn get_coordination_session(&self, session_id: CoordinationSessionId) -> Option<&CoordinationSession> {
4278 self.coordination_sessions.get(&session_id)
4279 }
4280
4281 #[allow(dead_code)] pub(crate) fn get_coordination_session_mut(&mut self, session_id: CoordinationSessionId) -> Option<&mut CoordinationSession> {
4284 self.coordination_sessions.get_mut(&session_id)
4285 }
4286
4287 #[allow(dead_code)] pub(crate) fn mark_coordination_success(
4290 &mut self,
4291 session_id: CoordinationSessionId,
4292 _now: Instant,
4293 ) -> Result<(), NatTraversalError> {
4294 let session = self.coordination_sessions.get_mut(&session_id)
4295 .ok_or(NatTraversalError::NoActiveCoordination)?;
4296
4297 session.phase = CoordinationPhase::Succeeded;
4298 session.stats.successful_coordinations += 1;
4299 self.stats.successful_coordinations += 1;
4300
4301 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4303 peer_a_record.coordination_count += 1;
4304 peer_a_record.success_rate = (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64 + 1.0) / peer_a_record.coordination_count as f64;
4305 }
4306
4307 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4308 peer_b_record.coordination_count += 1;
4309 peer_b_record.success_rate = (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64 + 1.0) / peer_b_record.coordination_count as f64;
4310 }
4311
4312 info!("Coordination session {} marked as successful", session_id);
4313 Ok(())
4314 }
4315
4316 #[allow(dead_code)] pub(crate) fn mark_coordination_failure(
4319 &mut self,
4320 session_id: CoordinationSessionId,
4321 reason: &str,
4322 _now: Instant,
4323 ) -> Result<(), NatTraversalError> {
4324 let session = self.coordination_sessions.get_mut(&session_id)
4325 .ok_or(NatTraversalError::NoActiveCoordination)?;
4326
4327 session.phase = CoordinationPhase::Failed;
4328
4329 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4331 peer_a_record.coordination_count += 1;
4332 peer_a_record.success_rate = (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64) / peer_a_record.coordination_count as f64;
4333 }
4334
4335 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4336 peer_b_record.coordination_count += 1;
4337 peer_b_record.success_rate = (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64) / peer_b_record.coordination_count as f64;
4338 }
4339
4340 warn!("Coordination session {} failed: {}", session_id, reason);
4341 Ok(())
4342 }
4343
4344 pub(crate) fn get_peer_record(&self, peer_id: PeerId) -> Option<&PeerObservationRecord> {
4346 self.peer_registry.get(&peer_id)
4347 }
4348
4349}
4350
4351impl Default for BootstrapConfig {
4352 fn default() -> Self {
4353 Self {
4354 _unused: (),
4355 }
4356 }
4357}
4358
4359#[derive(Debug)]
4365#[allow(dead_code)] pub(super) struct MultiDestinationTransmitter {
4367 active_targets: Vec<MultiDestPunchTarget>,
4369 stats: MultiDestTransmissionStats,
4371 max_targets: usize,
4373 rate_limiter: TransmissionRateLimiter,
4375 target_selector: AdaptiveTargetSelector,
4377 performance_monitor: TransmissionPerformanceMonitor,
4379}
4380
4381
4382#[derive(Debug, Default, Clone)]
4384pub(super) struct MultiDestTransmissionStats {
4385 _unused: (),
4386}
4387
4388#[derive(Debug)]
4390struct TransmissionRateLimiter {
4391 _unused: (),
4392}
4393
4394
4395#[derive(Debug)]
4397struct AdaptiveTargetSelector {
4398 _unused: (),
4399}
4400
4401
4402#[derive(Debug)]
4404struct TransmissionPerformanceMonitor {
4405 _unused: (),
4406}
4407
4408impl MultiDestinationTransmitter {
4409 pub(super) fn new() -> Self {
4411 Self {
4412 active_targets: Vec::new(),
4413 stats: MultiDestTransmissionStats::default(),
4414 max_targets: 8, rate_limiter: TransmissionRateLimiter::new(100, 50), target_selector: AdaptiveTargetSelector::new(),
4417 performance_monitor: TransmissionPerformanceMonitor::new(),
4418 }
4419 }
4420
4421
4422
4423
4424
4425
4426
4427
4428}
4429
4430
4431
4432impl TransmissionRateLimiter {
4433 fn new(_max_pps: u64, _burst_size: u64) -> Self {
4434 Self {
4435 _unused: (),
4436 }
4437 }
4438
4439}
4440
4441
4442impl AdaptiveTargetSelector {
4443 fn new() -> Self {
4444 Self {
4445 _unused: (),
4446 }
4447 }
4448
4449
4450}
4451
4452impl TransmissionPerformanceMonitor {
4453 fn new() -> Self {
4454 Self {
4455 _unused: (),
4456 }
4457 }
4458
4459}
4460
4461