ant_quic/connection/
nat_traversal.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    collections::{HashMap, VecDeque},
10    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
11    time::Duration,
12};
13
14use crate::shared::ConnectionId;
15use tracing::{debug, info, trace, warn};
16
17use crate::{Instant, VarInt};
18
19/// NAT traversal state for a QUIC connection
20///
21/// This manages address candidate discovery, validation, and coordination
22/// for establishing direct P2P connections through NATs.
23///
24/// v0.13.0: All nodes are symmetric P2P nodes - no role distinction.
25/// Every node can initiate, accept, and coordinate NAT traversal.
26#[derive(Debug)]
27pub(super) struct NatTraversalState {
28    // v0.13.0: role field removed - all nodes are symmetric P2P nodes
29    /// Candidate addresses we've advertised to the peer
30    pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
31    /// Candidate addresses received from the peer
32    pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
33    /// Generated candidate pairs for connectivity testing
34    pub(super) candidate_pairs: Vec<CandidatePair>,
35    /// Index for fast pair lookup by remote address (maintained during generation)
36    pub(super) pair_index: HashMap<SocketAddr, usize>,
37    /// Currently active path validation attempts
38    pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
39    /// Coordination state for simultaneous hole punching
40    pub(super) coordination: Option<CoordinationState>,
41    /// Sequence number for address advertisements
42    pub(super) next_sequence: VarInt,
43    /// Maximum candidates we're willing to handle
44    pub(super) max_candidates: u32,
45    /// Timeout for coordination rounds
46    pub(super) coordination_timeout: Duration,
47    /// Statistics for this NAT traversal session
48    pub(super) stats: NatTraversalStats,
49    /// Security validation state
50    pub(super) security_state: SecurityValidationState,
51    /// Network condition monitoring for adaptive timeouts
52    pub(super) network_monitor: NetworkConditionMonitor,
53    /// Resource management and cleanup coordinator
54    pub(super) resource_manager: ResourceCleanupCoordinator,
55    /// Coordination support - all nodes can coordinate (v0.13.0: always enabled)
56    pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
57}
58// v0.13.0: NatTraversalRole enum removed - all nodes are symmetric P2P nodes
59// Every node can initiate, accept, and coordinate NAT traversal without role distinction.
60/// Address candidate with metadata
61#[derive(Debug, Clone)]
62pub(super) struct AddressCandidate {
63    /// The socket address
64    pub(super) address: SocketAddr,
65    /// Priority for ICE-like selection (higher = better)
66    pub(super) priority: u32,
67    /// How this candidate was discovered
68    pub(super) source: CandidateSource,
69    /// When this candidate was first learned
70    pub(super) discovered_at: Instant,
71    /// Current state of this candidate
72    pub(super) state: CandidateState,
73    /// Number of validation attempts for this candidate
74    pub(super) attempt_count: u32,
75    /// Last validation attempt time
76    pub(super) last_attempt: Option<Instant>,
77}
78/// How an address candidate was discovered
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum CandidateSource {
81    /// Local network interface
82    Local,
83    /// Observed by a bootstrap node
84    ///
85    /// When present, `by_node` identifies the coordinator that reported the
86    /// observation using its node identifier.
87    Observed {
88        /// Identifier of the coordinator that observed our address
89        by_node: Option<VarInt>,
90    },
91    /// Received from peer via AddAddress frame
92    Peer,
93    /// Generated prediction for symmetric NAT
94    Predicted,
95}
96/// Current state of a candidate address
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum CandidateState {
99    /// Newly discovered, not yet tested
100    New,
101    /// Currently being validated
102    Validating,
103    /// Successfully validated and usable
104    Valid,
105    /// Validation failed
106    Failed,
107    /// Removed by peer or expired
108    Removed,
109}
110/// State of an individual path validation attempt
111#[derive(Debug)]
112#[allow(dead_code)]
113pub(super) struct PathValidationState {
114    /// Challenge value sent
115    pub(super) challenge: u64,
116    /// When the challenge was sent
117    pub(super) sent_at: Instant,
118    /// Number of retransmissions
119    pub(super) retry_count: u32,
120    /// Maximum retries allowed
121    pub(super) max_retries: u32,
122    /// Associated with a coordination round (if any)
123    pub(super) coordination_round: Option<VarInt>,
124    /// Adaptive timeout state
125    pub(super) timeout_state: AdaptiveTimeoutState,
126    /// Last retry attempt time
127    pub(super) last_retry_at: Option<Instant>,
128}
129/// Coordination state for simultaneous hole punching
130#[derive(Debug)]
131#[allow(dead_code)]
132pub(super) struct CoordinationState {
133    /// Current coordination round number
134    pub(super) round: VarInt,
135    /// Addresses we're punching to in this round
136    pub(super) punch_targets: Vec<PunchTarget>,
137    /// When this round started (coordination phase)
138    pub(super) round_start: Instant,
139    /// When hole punching should begin (synchronized time)
140    pub(super) punch_start: Instant,
141    /// Duration of this coordination round
142    pub(super) round_duration: Duration,
143    /// Current state of this coordination round
144    pub(super) state: CoordinationPhase,
145    /// Whether we've sent our PUNCH_ME_NOW to coordinator
146    pub(super) punch_request_sent: bool,
147    /// Whether we've received peer's PUNCH_ME_NOW via coordinator
148    pub(super) peer_punch_received: bool,
149    /// Retry count for this round
150    pub(super) retry_count: u32,
151    /// Maximum retries before giving up
152    pub(super) max_retries: u32,
153    /// Adaptive timeout state for coordination
154    pub(super) timeout_state: AdaptiveTimeoutState,
155    /// Last retry attempt time
156    pub(super) last_retry_at: Option<Instant>,
157}
158/// Phases of the coordination protocol
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160#[allow(dead_code)]
161pub(crate) enum CoordinationPhase {
162    /// Waiting to start coordination
163    Idle,
164    /// Sending PUNCH_ME_NOW to coordinator
165    Requesting,
166    /// Waiting for peer's PUNCH_ME_NOW via coordinator
167    Coordinating,
168    /// Grace period before synchronized hole punching
169    Preparing,
170    /// Actively sending PATH_CHALLENGE packets
171    Punching,
172    /// Waiting for PATH_RESPONSE validation
173    Validating,
174    /// This round completed successfully
175    Succeeded,
176    /// This round failed, may retry
177    Failed,
178}
179/// Target for hole punching in a coordination round
180#[derive(Debug, Clone)]
181pub(super) struct PunchTarget {
182    /// Remote address to punch to
183    pub(super) remote_addr: SocketAddr,
184    /// Sequence number of the remote candidate
185    pub(super) remote_sequence: VarInt,
186    /// Challenge value for validation
187    pub(super) challenge: u64,
188}
189/// Actions to take when handling NAT traversal timeouts
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub(super) enum TimeoutAction {
192    /// Retry candidate discovery
193    RetryDiscovery,
194    /// Retry coordination with bootstrap node
195    RetryCoordination,
196    /// Start path validation for discovered candidates
197    StartValidation,
198    /// NAT traversal completed successfully
199    Complete,
200    /// NAT traversal failed
201    Failed,
202}
203
204/// Candidate pair for ICE-like connectivity testing
205#[derive(Debug, Clone)]
206#[allow(dead_code)]
207pub(super) struct CandidatePair {
208    /// Sequence of remote candidate
209    pub(super) remote_sequence: VarInt,
210    /// Our local address for this pair
211    pub(super) local_addr: SocketAddr,
212    /// Remote address we're testing connectivity to
213    pub(super) remote_addr: SocketAddr,
214    /// Combined priority for pair ordering (higher = better)
215    pub(super) priority: u64,
216    /// Current state of this pair
217    pub(super) state: PairState,
218    /// Type classification for this pair
219    pub(super) pair_type: PairType,
220    /// When this pair was created
221    pub(super) created_at: Instant,
222    /// When validation was last attempted
223    pub(super) last_check: Option<Instant>,
224}
225/// State of a candidate pair during validation
226#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227#[allow(dead_code)]
228pub(super) enum PairState {
229    /// Waiting to be tested
230    Waiting,
231    /// Validation succeeded - this pair works
232    Succeeded,
233    /// Validation failed
234    Failed,
235    /// Temporarily frozen (waiting for other pairs)
236    Frozen,
237}
238/// Type classification for candidate pairs (based on ICE)
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
240pub(super) enum PairType {
241    /// Both candidates are on local network
242    HostToHost,
243    /// Local is host, remote is server reflexive (through NAT)
244    HostToServerReflexive,
245    /// Local is server reflexive, remote is host
246    ServerReflexiveToHost,
247    /// Both are server reflexive (both behind NAT)
248    ServerReflexiveToServerReflexive,
249    /// One side is peer reflexive (learned from peer)
250    PeerReflexive,
251}
252/// Type of address candidate (following ICE terminology)
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub(super) enum CandidateType {
255    /// Host candidate - directly reachable local interface
256    Host,
257    /// Server reflexive - public address observed by bootstrap node
258    ServerReflexive,
259    /// Peer reflexive - address learned from incoming packets
260    PeerReflexive,
261}
262
263/// Calculate ICE-like priority for an address candidate
264/// Based on RFC 8445 Section 5.1.2.1
265#[allow(dead_code)]
266fn calculate_candidate_priority(
267    candidate_type: CandidateType,
268    local_preference: u16,
269    component_id: u8,
270) -> u32 {
271    let type_preference = match candidate_type {
272        CandidateType::Host => 126,
273        CandidateType::PeerReflexive => 110,
274        CandidateType::ServerReflexive => 100,
275    };
276    // ICE priority formula: (2^24 * type_pref) + (2^8 * local_pref) + component_id
277    (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
278}
279
280/// Calculate combined priority for a candidate pair
281/// Based on RFC 8445 Section 6.1.2.3
282fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
283    let g = local_priority as u64;
284    let d = remote_priority as u64;
285    // ICE pair priority formula: 2^32 * MIN(G,D) + 2 * MAX(G,D) + (G>D ? 1 : 0)
286    (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
287}
288
289/// Determine candidate type from source information
290fn classify_candidate_type(source: CandidateSource) -> CandidateType {
291    match source {
292        CandidateSource::Local => CandidateType::Host,
293        CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
294        CandidateSource::Peer => CandidateType::PeerReflexive,
295        CandidateSource::Predicted => CandidateType::ServerReflexive, // Symmetric NAT prediction
296    }
297}
298/// Determine pair type from individual candidate types
299fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
300    match (local_type, remote_type) {
301        (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
302        (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
303        (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
304        (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
305            PairType::ServerReflexiveToServerReflexive
306        }
307        (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
308            PairType::PeerReflexive
309        }
310    }
311}
312/// Check if two candidates are compatible for pairing
313fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
314    // Must be same address family (IPv4 with IPv4, IPv6 with IPv6)
315    match (local.address, remote.address) {
316        (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
317        (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
318        _ => false, // No IPv4/IPv6 mixing for now
319    }
320}
321/// Statistics for NAT traversal attempts
322#[derive(Debug, Default, Clone)]
323#[allow(dead_code)]
324pub(crate) struct NatTraversalStats {
325    /// Total candidates received from peer
326    pub(super) remote_candidates_received: u32,
327    /// Total candidates we've advertised
328    pub(super) local_candidates_sent: u32,
329    /// Successful validations
330    pub(super) validations_succeeded: u32,
331    /// Failed validations
332    pub(super) validations_failed: u32,
333    /// Coordination rounds attempted
334    pub(super) coordination_rounds: u32,
335    /// Successful coordinations
336    pub(super) successful_coordinations: u32,
337    /// Failed coordinations
338    pub(super) failed_coordinations: u32,
339    /// Timed out coordinations
340    pub(super) timed_out_coordinations: u32,
341    /// Coordination failures due to poor network conditions
342    pub(super) coordination_failures: u32,
343    /// Successful direct connections established
344    pub(super) direct_connections: u32,
345    /// Security validation rejections
346    pub(super) security_rejections: u32,
347    /// Rate limiting violations
348    pub(super) rate_limit_violations: u32,
349    /// Invalid address rejections
350    pub(super) invalid_address_rejections: u32,
351    /// Suspicious coordination attempts
352    pub(super) suspicious_coordination_attempts: u32,
353}
354/// Security validation state for rate limiting and attack detection
355#[derive(Debug)]
356#[allow(dead_code)]
357pub(super) struct SecurityValidationState {
358    /// Rate limiting: track candidate additions per time window
359    candidate_rate_tracker: VecDeque<Instant>,
360    /// Maximum candidates per time window
361    max_candidates_per_window: u32,
362    /// Rate limiting time window
363    rate_window: Duration,
364    /// Coordination request tracking for suspicious patterns
365    coordination_requests: VecDeque<CoordinationRequest>,
366    /// Maximum coordination requests per time window
367    max_coordination_per_window: u32,
368    /// Address validation cache to avoid repeated validation
369    address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
370    /// Cache timeout for address validation
371    validation_cache_timeout: Duration,
372}
373/// Coordination request tracking for security analysis
374#[derive(Debug, Clone)]
375struct CoordinationRequest {
376    /// When the request was made
377    timestamp: Instant,
378}
379/// Result of address validation
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381enum AddressValidationResult {
382    /// Address is valid and safe
383    Valid,
384    /// Address is invalid (malformed, reserved, etc.)
385    Invalid,
386    /// Address is suspicious (potential attack)
387    Suspicious,
388}
389/// Adaptive timeout state for network condition awareness
390#[derive(Debug, Clone)]
391pub(super) struct AdaptiveTimeoutState {
392    /// Current timeout value
393    current_timeout: Duration,
394    /// Minimum allowed timeout
395    min_timeout: Duration,
396    /// Maximum allowed timeout
397    max_timeout: Duration,
398    /// Base timeout for exponential backoff
399    base_timeout: Duration,
400    /// Current backoff multiplier
401    backoff_multiplier: f64,
402    /// Maximum backoff multiplier
403    max_backoff_multiplier: f64,
404    /// Jitter factor for randomization
405    jitter_factor: f64,
406    /// Smoothed round-trip time estimation
407    srtt: Option<Duration>,
408    /// Round-trip time variance
409    rttvar: Option<Duration>,
410    /// Last successful round-trip time
411    last_rtt: Option<Duration>,
412    /// Number of consecutive timeouts
413    consecutive_timeouts: u32,
414    /// Number of successful responses
415    successful_responses: u32,
416}
417/// Network condition monitoring for adaptive behavior
418#[derive(Debug)]
419#[allow(dead_code)]
420pub(super) struct NetworkConditionMonitor {
421    /// Recent round-trip time measurements
422    rtt_samples: VecDeque<Duration>,
423    /// Maximum samples to keep
424    max_samples: usize,
425    /// Packet loss rate estimation
426    packet_loss_rate: f64,
427    /// Congestion window estimate
428    congestion_window: u32,
429    /// Network quality score (0.0 = poor, 1.0 = excellent)
430    quality_score: f64,
431    /// Last quality update time
432    last_quality_update: Instant,
433    /// Quality measurement interval
434    quality_update_interval: Duration,
435    /// Timeout statistics
436    timeout_stats: TimeoutStatistics,
437}
438/// Statistics for timeout behavior
439#[derive(Debug, Default)]
440struct TimeoutStatistics {
441    /// Total timeout events
442    total_timeouts: u64,
443    /// Total successful responses
444    total_responses: u64,
445    /// Average response time
446    avg_response_time: Duration,
447    /// Timeout rate (0.0 = no timeouts, 1.0 = all timeouts)
448    timeout_rate: f64,
449    /// Last update time
450    last_update: Option<Instant>,
451}
452#[allow(dead_code)]
453impl SecurityValidationState {
454    /// Create new security validation state with default settings
455    fn new() -> Self {
456        Self {
457            candidate_rate_tracker: VecDeque::new(),
458            max_candidates_per_window: 20, // Max 20 candidates per 60 seconds
459            rate_window: Duration::from_secs(60),
460            coordination_requests: VecDeque::new(),
461            max_coordination_per_window: 5, // Max 5 coordination requests per 60 seconds
462            address_validation_cache: HashMap::new(),
463            validation_cache_timeout: Duration::from_secs(300), // 5 minute cache
464        }
465    }
466    /// Create new security validation state with custom rate limits
467    fn new_with_limits(
468        max_candidates_per_window: u32,
469        max_coordination_per_window: u32,
470        rate_window: Duration,
471    ) -> Self {
472        Self {
473            candidate_rate_tracker: VecDeque::new(),
474            max_candidates_per_window,
475            rate_window,
476            coordination_requests: VecDeque::new(),
477            max_coordination_per_window,
478            address_validation_cache: HashMap::new(),
479            validation_cache_timeout: Duration::from_secs(300),
480        }
481    }
482    /// Enhanced rate limiting with adaptive thresholds
483    ///
484    /// This implements adaptive rate limiting that adjusts based on network conditions
485    /// and detected attack patterns to prevent flooding while maintaining usability.
486    fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
487        // Clean up old entries first
488        self.cleanup_rate_tracker(now);
489        self.cleanup_coordination_tracker(now);
490        // Calculate current request rate
491        let _current_candidate_rate =
492            self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
493        let _current_coordination_rate =
494            self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
495
496        // Adaptive threshold based on peer behavior
497        let peer_reputation = self.calculate_peer_reputation(peer_id);
498        let adaptive_candidate_limit =
499            (self.max_candidates_per_window as f64 * peer_reputation) as u32;
500        let adaptive_coordination_limit =
501            (self.max_coordination_per_window as f64 * peer_reputation) as u32;
502
503        // Check if either limit is exceeded
504        if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
505            debug!(
506                "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
507                hex::encode(&peer_id[..8]),
508                self.candidate_rate_tracker.len(),
509                adaptive_candidate_limit
510            );
511            return true;
512        }
513
514        if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
515            debug!(
516                "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
517                hex::encode(&peer_id[..8]),
518                self.coordination_requests.len(),
519                adaptive_coordination_limit
520            );
521            return true;
522        }
523
524        false
525    }
526
527    /// Calculate peer reputation score (0.0 = bad, 1.0 = good)
528    ///
529    /// This implements a simple reputation system to adjust rate limits
530    /// based on peer behavior patterns.
531    fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
532        // Simplified reputation calculation
533        // In production, this would track:
534        // - Historical success rates
535        // - Suspicious behavior patterns
536        // - Coordination completion rates
537        // - Address validation failures
538        // For now, return a default good reputation
539        // This can be enhanced with persistent peer reputation storage
540        1.0
541    }
542
543    /// Implement amplification attack mitigation
544    ///
545    /// This prevents the bootstrap node from being used as an amplifier
546    /// in DDoS attacks by limiting server-initiated validation packets.
547    fn validate_amplification_limits(
548        &mut self,
549        source_addr: SocketAddr,
550        target_addr: SocketAddr,
551        now: Instant,
552    ) -> Result<(), NatTraversalError> {
553        // Check if we're being asked to send too many packets to the same target
554        let amplification_key = (source_addr, target_addr);
555        // Simple amplification protection: limit packets per source-target pair
556        // In production, this would be more sophisticated with:
557        // - Bandwidth tracking
558        // - Packet size ratios
559        // - Geographic analysis
560        // - Temporal pattern analysis
561
562        // For now, implement basic per-pair rate limiting
563        if self.is_amplification_suspicious(amplification_key, now) {
564            warn!(
565                "Potential amplification attack detected: {} -> {}",
566                source_addr, target_addr
567            );
568            return Err(NatTraversalError::SuspiciousCoordination);
569        }
570
571        Ok(())
572    }
573
574    /// Check for suspicious amplification patterns
575    fn is_amplification_suspicious(
576        &self,
577        _amplification_key: (SocketAddr, SocketAddr),
578        _now: Instant,
579    ) -> bool {
580        // Simplified amplification detection
581        // In production, this would track:
582        // - Request/response ratios
583        // - Bandwidth amplification factors
584        // - Temporal clustering of requests
585        // - Geographic distribution analysis
586        // For now, return false (no amplification detected)
587        // This can be enhanced with persistent amplification tracking
588        false
589    }
590
591    /// Generate cryptographically secure random values for coordination rounds
592    ///
593    /// This ensures that coordination rounds use secure random values to prevent
594    /// prediction attacks and ensure proper synchronization security.
595    fn generate_secure_coordination_round(&self) -> VarInt {
596        // Use cryptographically secure random number generation
597        let secure_random: u64 = rand::random();
598        // Ensure the value is within reasonable bounds for VarInt
599        let bounded_random = secure_random % 1000000; // Limit to reasonable range
600
601        VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
602    }
603
604    /// Enhanced address validation with security checks
605    ///
606    /// This performs comprehensive address validation including:
607    /// - Basic format validation
608    /// - Security threat detection
609    /// - Amplification attack prevention
610    /// - Suspicious pattern recognition
611    fn enhanced_address_validation(
612        &mut self,
613        addr: SocketAddr,
614        source_addr: SocketAddr,
615        now: Instant,
616    ) -> Result<AddressValidationResult, NatTraversalError> {
617        // First, perform basic address validation
618        let basic_result = self.validate_address(addr, now);
619        match basic_result {
620            AddressValidationResult::Invalid => {
621                return Err(NatTraversalError::InvalidAddress);
622            }
623            AddressValidationResult::Suspicious => {
624                return Err(NatTraversalError::SuspiciousCoordination);
625            }
626            AddressValidationResult::Valid => {
627                // Continue with enhanced validation
628            }
629        }
630
631        // Check for amplification attack patterns
632        self.validate_amplification_limits(source_addr, addr, now)?;
633
634        // Additional security checks
635        if self.is_address_in_suspicious_range(addr) {
636            warn!("Address in suspicious range detected: {}", addr);
637            return Err(NatTraversalError::SuspiciousCoordination);
638        }
639
640        if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
641            warn!(
642                "Suspicious coordination pattern detected: {} -> {}",
643                source_addr, addr
644            );
645            return Err(NatTraversalError::SuspiciousCoordination);
646        }
647
648        Ok(AddressValidationResult::Valid)
649    }
650
651    /// Check if address is in a suspicious range
652    fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
653        match addr.ip() {
654            IpAddr::V4(ipv4) => {
655                // Check for addresses commonly used in attacks
656                let octets = ipv4.octets();
657                // Reject certain reserved ranges that shouldn't be used for P2P
658                if octets[0] == 0 || octets[0] == 127 {
659                    return true;
660                }
661
662                // Check for test networks (RFC 5737)
663                if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
664                    return true;
665                }
666                if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
667                    return true;
668                }
669                if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
670                    return true;
671                }
672
673                false
674            }
675            IpAddr::V6(ipv6) => {
676                // Check for suspicious IPv6 ranges
677                if ipv6.is_loopback() || ipv6.is_unspecified() {
678                    return true;
679                }
680
681                // Check for documentation ranges (RFC 3849)
682                let segments = ipv6.segments();
683                if segments[0] == 0x2001 && segments[1] == 0x0db8 {
684                    return true;
685                }
686
687                false
688            }
689        }
690    }
691
692    /// Check for suspicious coordination patterns
693    fn is_coordination_pattern_suspicious(
694        &self,
695        _source_addr: SocketAddr,
696        _target_addr: SocketAddr,
697        _now: Instant,
698    ) -> bool {
699        // Simplified pattern detection
700        // In production, this would analyze:
701        // - Temporal patterns (too frequent requests)
702        // - Geographic patterns (unusual source/target combinations)
703        // - Behavioral patterns (consistent with known attack signatures)
704        // - Network topology patterns (suspicious routing)
705        // For now, return false (no suspicious patterns detected)
706        // This can be enhanced with machine learning-based pattern detection
707        false
708    }
709
710    /// Check if candidate rate limit is exceeded
711    fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
712        // Clean up old entries
713        self.cleanup_rate_tracker(now);
714        // Check if we've exceeded the rate limit
715        if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
716            return true;
717        }
718
719        // Record this attempt
720        self.candidate_rate_tracker.push_back(now);
721        false
722    }
723
724    /// Check if coordination rate limit is exceeded
725    fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
726        // Clean up old entries
727        self.cleanup_coordination_tracker(now);
728        // Check if we've exceeded the rate limit
729        if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
730            return true;
731        }
732
733        // Record this attempt
734        let request = CoordinationRequest { timestamp: now };
735        self.coordination_requests.push_back(request);
736        false
737    }
738
739    /// Clean up old rate tracking entries
740    fn cleanup_rate_tracker(&mut self, now: Instant) {
741        let cutoff = now - self.rate_window;
742        while let Some(&front_time) = self.candidate_rate_tracker.front() {
743            if front_time < cutoff {
744                self.candidate_rate_tracker.pop_front();
745            } else {
746                break;
747            }
748        }
749    }
750    /// Clean up old coordination tracking entries
751    fn cleanup_coordination_tracker(&mut self, now: Instant) {
752        let cutoff = now - self.rate_window;
753        while let Some(front_request) = self.coordination_requests.front() {
754            if front_request.timestamp < cutoff {
755                self.coordination_requests.pop_front();
756            } else {
757                break;
758            }
759        }
760    }
761    /// Validate an address for security concerns
762    fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
763        // Check cache first
764        if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
765            return cached_result;
766        }
767        let result = self.perform_address_validation(addr);
768
769        // Cache the result
770        self.address_validation_cache.insert(addr, result);
771
772        // Clean up old cache entries periodically
773        if self.address_validation_cache.len() > 1000 {
774            self.cleanup_address_cache(now);
775        }
776
777        result
778    }
779
780    /// Perform actual address validation
781    fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
782        match addr.ip() {
783            IpAddr::V4(ipv4) => {
784                // Check for invalid IPv4 addresses
785                if ipv4.is_unspecified() || ipv4.is_broadcast() {
786                    return AddressValidationResult::Invalid;
787                }
788                // Check for suspicious addresses
789                if ipv4.is_multicast() || ipv4.is_documentation() {
790                    return AddressValidationResult::Suspicious;
791                }
792
793                // Check for reserved ranges that shouldn't be used for P2P
794                if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
795                    return AddressValidationResult::Invalid;
796                }
797
798                // Check for common attack patterns
799                if self.is_suspicious_ipv4(ipv4) {
800                    return AddressValidationResult::Suspicious;
801                }
802            }
803            IpAddr::V6(ipv6) => {
804                // Check for invalid IPv6 addresses
805                if ipv6.is_unspecified() || ipv6.is_multicast() {
806                    return AddressValidationResult::Invalid;
807                }
808
809                // Check for suspicious IPv6 addresses
810                if self.is_suspicious_ipv6(ipv6) {
811                    return AddressValidationResult::Suspicious;
812                }
813            }
814        }
815
816        // Check port range
817        if addr.port() == 0 || addr.port() < 1024 {
818            return AddressValidationResult::Suspicious;
819        }
820
821        AddressValidationResult::Valid
822    }
823
824    /// Check for suspicious IPv4 patterns
825    fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
826        let octets = ipv4.octets();
827        // Check for patterns that might indicate scanning or attacks
828        // Sequential or patterned addresses
829        if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
830            return true;
831        }
832
833        // Check for addresses in ranges commonly used for attacks
834        // This is a simplified check - production would have more sophisticated patterns
835        false
836    }
837
838    /// Check for suspicious IPv6 patterns
839    fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
840        let segments = ipv6.segments();
841        // Check for obvious patterns
842        if segments.iter().all(|&s| s == segments[0]) {
843            return true;
844        }
845
846        false
847    }
848
849    /// Clean up old address validation cache entries
850    fn cleanup_address_cache(&mut self, _now: Instant) {
851        // Simple cleanup - remove random entries to keep size bounded
852        // In production, this would use LRU or timestamp-based cleanup
853        if self.address_validation_cache.len() > 500 {
854            let keys_to_remove: Vec<_> = self
855                .address_validation_cache
856                .keys()
857                .take(self.address_validation_cache.len() / 2)
858                .copied()
859                .collect();
860            for key in keys_to_remove {
861                self.address_validation_cache.remove(&key);
862            }
863        }
864    }
865
866    /// Comprehensive path validation for PUNCH_ME_NOW frames
867    ///
868    /// This performs security-critical validation to prevent various attacks:
869    /// - Address spoofing prevention
870    /// - Reflection attack mitigation
871    /// - Coordination request validation
872    /// - Rate limiting enforcement
873    fn validate_punch_me_now_frame(
874        &mut self,
875        frame: &crate::frame::PunchMeNow,
876        source_addr: SocketAddr,
877        peer_id: [u8; 32],
878        now: Instant,
879    ) -> Result<(), NatTraversalError> {
880        // 1. Rate limiting validation
881        if self.is_coordination_rate_limited(now) {
882            debug!(
883                "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
884                hex::encode(&peer_id[..8])
885            );
886            return Err(NatTraversalError::RateLimitExceeded);
887        }
888        // 2. Address validation - validate the address claimed in the frame
889        let addr_validation = self.validate_address(frame.address, now);
890        match addr_validation {
891            AddressValidationResult::Invalid => {
892                debug!(
893                    "PUNCH_ME_NOW frame rejected: invalid address {:?} from peer {:?}",
894                    frame.address,
895                    hex::encode(&peer_id[..8])
896                );
897                return Err(NatTraversalError::InvalidAddress);
898            }
899            AddressValidationResult::Suspicious => {
900                debug!(
901                    "PUNCH_ME_NOW frame rejected: suspicious address {:?} from peer {:?}",
902                    frame.address,
903                    hex::encode(&peer_id[..8])
904                );
905                return Err(NatTraversalError::SuspiciousCoordination);
906            }
907            AddressValidationResult::Valid => {
908                // Continue validation
909            }
910        }
911
912        // 3. Source address consistency validation
913        // The frame's address should reasonably relate to the actual source
914        if !self.validate_address_consistency(frame.address, source_addr) {
915            debug!(
916                "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
917                frame.address, source_addr
918            );
919            return Err(NatTraversalError::SuspiciousCoordination);
920        }
921
922        // 4. Coordination parameters validation
923        if !self.validate_coordination_parameters(frame) {
924            debug!(
925                "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
926                hex::encode(&peer_id[..8])
927            );
928            return Err(NatTraversalError::SuspiciousCoordination);
929        }
930
931        // 5. Target peer validation (if present)
932        if let Some(target_peer_id) = frame.target_peer_id {
933            if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
934                debug!(
935                    "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
936                    hex::encode(&peer_id[..8]),
937                    hex::encode(&target_peer_id[..8])
938                );
939                return Err(NatTraversalError::SuspiciousCoordination);
940            }
941        }
942
943        // 6. Resource limits validation
944        if !self.validate_resource_limits(frame) {
945            debug!(
946                "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
947                hex::encode(&peer_id[..8])
948            );
949            return Err(NatTraversalError::ResourceLimitExceeded);
950        }
951
952        debug!(
953            "PUNCH_ME_NOW frame validation passed for peer {:?}",
954            hex::encode(&peer_id[..8])
955        );
956        Ok(())
957    }
958
959    /// Validate address consistency between claimed and observed addresses
960    ///
961    /// This prevents address spoofing by ensuring the claimed local address
962    /// is reasonably consistent with the observed source address.
963    fn validate_address_consistency(
964        &self,
965        claimed_addr: SocketAddr,
966        observed_addr: SocketAddr,
967    ) -> bool {
968        // For P2P NAT traversal, the port will typically be different due to NAT,
969        // but the IP should be consistent unless there's multi-homing or proxying
970        // Check if IPs are in the same family
971        match (claimed_addr.ip(), observed_addr.ip()) {
972            (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
973                // For IPv4, allow same IP or addresses in same private range
974                if claimed_ip == observed_ip {
975                    return true;
976                }
977
978                // Allow within same private network (simplified check)
979                if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
980                    return true;
981                }
982
983                // Allow certain NAT scenarios where external IP differs
984                // This is a simplified check - production would be more sophisticated
985                !claimed_ip.is_private() && !observed_ip.is_private()
986            }
987            (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
988                // For IPv6, be more lenient due to complex addressing
989                claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
990            }
991            _ => {
992                // Mismatched IP families - suspicious
993                false
994            }
995        }
996    }
997
998    /// Check if two IPv4 addresses are in the same private network
999    fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1000        // Check common private ranges
1001        let ip1_octets = ip1.octets();
1002        let ip2_octets = ip2.octets();
1003        // 10.0.0.0/8
1004        if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1005            return true;
1006        }
1007
1008        // 172.16.0.0/12
1009        if ip1_octets[0] == 172
1010            && ip2_octets[0] == 172
1011            && (16..=31).contains(&ip1_octets[1])
1012            && (16..=31).contains(&ip2_octets[1])
1013        {
1014            return true;
1015        }
1016
1017        // 192.168.0.0/16
1018        if ip1_octets[0] == 192
1019            && ip1_octets[1] == 168
1020            && ip2_octets[0] == 192
1021            && ip2_octets[1] == 168
1022        {
1023            return true;
1024        }
1025
1026        false
1027    }
1028
1029    /// Check if two IPv6 addresses are in the same prefix
1030    fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1031        // Simplified IPv6 prefix check - compare first 64 bits
1032        let segments1 = ip1.segments();
1033        let segments2 = ip2.segments();
1034        segments1[0] == segments2[0]
1035            && segments1[1] == segments2[1]
1036            && segments1[2] == segments2[2]
1037            && segments1[3] == segments2[3]
1038    }
1039
1040    /// Validate coordination parameters for security
1041    fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1042        // Check round number is reasonable (not too large to prevent overflow attacks)
1043        if frame.round.into_inner() > 1000000 {
1044            return false;
1045        }
1046        // Check target sequence is reasonable
1047        if frame.paired_with_sequence_number.into_inner() > 10000 {
1048            return false;
1049        }
1050
1051        // Validate address is not obviously invalid
1052        match frame.address.ip() {
1053            IpAddr::V4(ipv4) => {
1054                // Reject obviously invalid addresses
1055                !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1056            }
1057            IpAddr::V6(ipv6) => {
1058                // Reject obviously invalid addresses
1059                !ipv6.is_unspecified() && !ipv6.is_multicast()
1060            }
1061        }
1062    }
1063
1064    /// Validate target peer request for potential abuse
1065    fn validate_target_peer_request(
1066        &self,
1067        requesting_peer: [u8; 32],
1068        target_peer: [u8; 32],
1069        _frame: &crate::frame::PunchMeNow,
1070    ) -> bool {
1071        // Prevent self-coordination (peer requesting coordination with itself)
1072        if requesting_peer == target_peer {
1073            return false;
1074        }
1075        // Additional validation could include:
1076        // - Check if target peer is known/registered
1077        // - Validate target peer hasn't opted out of coordination
1078        // - Check for suspicious patterns in target peer selection
1079
1080        true
1081    }
1082
1083    /// Validate resource limits for the coordination request
1084    fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1085        // Check current load and resource usage
1086        // This is a simplified check - production would monitor:
1087        // - Active coordination sessions
1088        // - Memory usage
1089        // - Network bandwidth
1090        // - CPU utilization
1091        // For now, just check if we have too many active coordination requests
1092        self.coordination_requests.len() < self.max_coordination_per_window as usize
1093    }
1094}
1095
1096impl AdaptiveTimeoutState {
1097    /// Create new adaptive timeout state with default values
1098    pub(crate) fn new() -> Self {
1099        let base_timeout = Duration::from_millis(1000); // 1 second base
1100        Self {
1101            current_timeout: base_timeout,
1102            min_timeout: Duration::from_millis(100),
1103            max_timeout: Duration::from_secs(30),
1104            base_timeout,
1105            backoff_multiplier: 1.0,
1106            max_backoff_multiplier: 8.0,
1107            jitter_factor: 0.1, // 10% jitter
1108            srtt: None,
1109            rttvar: None,
1110            last_rtt: None,
1111            consecutive_timeouts: 0,
1112            successful_responses: 0,
1113        }
1114    }
1115    /// Update timeout based on successful response
1116    fn update_success(&mut self, rtt: Duration) {
1117        self.last_rtt = Some(rtt);
1118        self.successful_responses += 1;
1119        self.consecutive_timeouts = 0;
1120        // Update smoothed RTT using TCP algorithm
1121        match self.srtt {
1122            None => {
1123                self.srtt = Some(rtt);
1124                self.rttvar = Some(rtt / 2);
1125            }
1126            Some(srtt) => {
1127                let rttvar = self.rttvar.unwrap_or(rtt / 2);
1128                let abs_diff = rtt.abs_diff(srtt);
1129
1130                self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1131                self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1132            }
1133        }
1134
1135        // Reduce backoff multiplier on success
1136        self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1137
1138        // Update current timeout
1139        self.calculate_current_timeout();
1140    }
1141
1142    /// Update timeout based on timeout event
1143    fn update_timeout(&mut self) {
1144        self.consecutive_timeouts += 1;
1145        // Exponential backoff with bounds
1146        self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1147
1148        // Update current timeout
1149        self.calculate_current_timeout();
1150    }
1151
1152    /// Calculate current timeout based on conditions
1153    fn calculate_current_timeout(&mut self) {
1154        let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1155            // Use TCP-style RTO calculation: RTO = SRTT + 4 * RTTVAR
1156            srtt + rttvar * 4
1157        } else {
1158            self.base_timeout
1159        };
1160        // Apply backoff multiplier
1161        let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1162
1163        // Apply jitter to prevent thundering herd
1164        let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1165        let timeout = timeout.mul_f64(jitter);
1166
1167        // Bound the timeout
1168        self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1169    }
1170
1171    /// Get current timeout value
1172    fn get_timeout(&self) -> Duration {
1173        self.current_timeout
1174    }
1175    /// Check if retry should be attempted
1176    fn should_retry(&self, max_retries: u32) -> bool {
1177        self.consecutive_timeouts < max_retries
1178    }
1179    /// Get retry delay with exponential backoff
1180    fn get_retry_delay(&self) -> Duration {
1181        let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1182        delay.clamp(self.min_timeout, self.max_timeout)
1183    }
1184}
1185/// Resource management limits and cleanup configuration
1186#[derive(Debug)]
1187#[allow(dead_code)]
1188pub(super) struct ResourceManagementConfig {
1189    /// Maximum number of active validations
1190    max_active_validations: usize,
1191    /// Maximum number of local candidates
1192    max_local_candidates: usize,
1193    /// Maximum number of remote candidates
1194    max_remote_candidates: usize,
1195    /// Maximum number of candidate pairs
1196    max_candidate_pairs: usize,
1197    /// Maximum coordination rounds to keep in history
1198    max_coordination_history: usize,
1199    /// Cleanup interval for expired resources
1200    cleanup_interval: Duration,
1201    /// Timeout for stale candidates
1202    candidate_timeout: Duration,
1203    /// Timeout for path validations
1204    validation_timeout: Duration,
1205    /// Timeout for coordination rounds
1206    coordination_timeout: Duration,
1207    /// Memory pressure threshold (0.0 = no pressure, 1.0 = maximum pressure)
1208    memory_pressure_threshold: f64,
1209    /// Aggressive cleanup mode threshold
1210    aggressive_cleanup_threshold: f64,
1211}
1212/// Resource usage statistics and monitoring
1213#[derive(Debug, Default)]
1214#[allow(dead_code)]
1215pub(super) struct ResourceStats {
1216    /// Current number of active validations
1217    active_validations: usize,
1218    /// Current number of local candidates
1219    local_candidates: usize,
1220    /// Current number of remote candidates
1221    remote_candidates: usize,
1222    /// Current number of candidate pairs
1223    candidate_pairs: usize,
1224    /// Peak memory usage
1225    peak_memory_usage: usize,
1226    /// Number of cleanup operations performed
1227    cleanup_operations: u64,
1228    /// Number of resources cleaned up
1229    resources_cleaned: u64,
1230    /// Number of resource allocation failures
1231    allocation_failures: u64,
1232    /// Last cleanup time
1233    last_cleanup: Option<Instant>,
1234    /// Memory pressure level (0.0 = no pressure, 1.0 = maximum pressure)
1235    memory_pressure: f64,
1236}
1237/// Resource cleanup coordinator
1238#[derive(Debug)]
1239pub(super) struct ResourceCleanupCoordinator {
1240    /// Configuration for resource limits
1241    config: ResourceManagementConfig,
1242    /// Resource usage statistics
1243    stats: ResourceStats,
1244    /// Last cleanup time
1245    last_cleanup: Option<Instant>,
1246    /// Cleanup operation counter
1247    cleanup_counter: u64,
1248    /// Shutdown flag
1249    shutdown_requested: bool,
1250}
1251impl ResourceManagementConfig {
1252    /// Create new resource management configuration with production-ready defaults
1253    fn new() -> Self {
1254        Self {
1255            max_active_validations: 100,
1256            max_local_candidates: 50,
1257            max_remote_candidates: 100,
1258            max_candidate_pairs: 200,
1259            max_coordination_history: 10,
1260            cleanup_interval: Duration::from_secs(30),
1261            candidate_timeout: Duration::from_secs(300), // 5 minutes
1262            validation_timeout: Duration::from_secs(30),
1263            coordination_timeout: Duration::from_secs(60),
1264            memory_pressure_threshold: 0.75,
1265            aggressive_cleanup_threshold: 0.90,
1266        }
1267    }
1268    /// Create configuration optimized for low-memory environments
1269    #[cfg(feature = "low_memory")]
1270    fn low_memory() -> Self {
1271        Self {
1272            max_active_validations: 25,
1273            max_local_candidates: 10,
1274            max_remote_candidates: 25,
1275            max_candidate_pairs: 50,
1276            max_coordination_history: 3,
1277            cleanup_interval: Duration::from_secs(15),
1278            candidate_timeout: Duration::from_secs(180), // 3 minutes
1279            validation_timeout: Duration::from_secs(20),
1280            coordination_timeout: Duration::from_secs(30),
1281            memory_pressure_threshold: 0.60,
1282            aggressive_cleanup_threshold: 0.80,
1283        }
1284    }
1285}
1286#[allow(dead_code)]
1287impl ResourceCleanupCoordinator {
1288    /// Create new resource cleanup coordinator
1289    fn new() -> Self {
1290        Self {
1291            config: ResourceManagementConfig::new(),
1292            stats: ResourceStats::default(),
1293            last_cleanup: None,
1294            cleanup_counter: 0,
1295            shutdown_requested: false,
1296        }
1297    }
1298    /// Create coordinator optimized for low-memory environments
1299    #[cfg(feature = "low_memory")]
1300    fn low_memory() -> Self {
1301        Self {
1302            config: ResourceManagementConfig::low_memory(),
1303            stats: ResourceStats::default(),
1304            last_cleanup: None,
1305            cleanup_counter: 0,
1306            shutdown_requested: false,
1307        }
1308    }
1309    /// Check if resource limits are exceeded
1310    fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1311        state.active_validations.len() > self.config.max_active_validations
1312            || state.local_candidates.len() > self.config.max_local_candidates
1313            || state.remote_candidates.len() > self.config.max_remote_candidates
1314            || state.candidate_pairs.len() > self.config.max_candidate_pairs
1315    }
1316    /// Calculate current memory pressure level
1317    fn calculate_memory_pressure(
1318        &mut self,
1319        active_validations_len: usize,
1320        local_candidates_len: usize,
1321        remote_candidates_len: usize,
1322        candidate_pairs_len: usize,
1323    ) -> f64 {
1324        let total_limit = self.config.max_active_validations
1325            + self.config.max_local_candidates
1326            + self.config.max_remote_candidates
1327            + self.config.max_candidate_pairs;
1328        let current_usage = active_validations_len
1329            + local_candidates_len
1330            + remote_candidates_len
1331            + candidate_pairs_len;
1332
1333        let pressure = current_usage as f64 / total_limit as f64;
1334        self.stats.memory_pressure = pressure;
1335        pressure
1336    }
1337
1338    /// Determine if cleanup is needed
1339    fn should_cleanup(&self, now: Instant) -> bool {
1340        if self.shutdown_requested {
1341            return true;
1342        }
1343        // Check if it's time for regular cleanup
1344        if let Some(last_cleanup) = self.last_cleanup {
1345            if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1346                return true;
1347            }
1348        } else {
1349            return true; // First cleanup
1350        }
1351
1352        // Check memory pressure
1353        if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1354            return true;
1355        }
1356
1357        false
1358    }
1359
1360    /// Perform cleanup of expired resources
1361    fn cleanup_expired_resources(
1362        &mut self,
1363        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1364        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1365        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1366        candidate_pairs: &mut Vec<CandidatePair>,
1367        coordination: &mut Option<CoordinationState>,
1368        now: Instant,
1369    ) -> u64 {
1370        let mut cleaned = 0;
1371        // Clean up expired path validations
1372        cleaned += self.cleanup_expired_validations(active_validations, now);
1373
1374        // Clean up stale candidates
1375        cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1376
1377        // Clean up failed candidate pairs
1378        cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1379
1380        // Clean up old coordination state
1381        cleaned += self.cleanup_old_coordination(coordination, now);
1382
1383        // Update statistics
1384        self.stats.cleanup_operations += 1;
1385        self.stats.resources_cleaned += cleaned;
1386        self.last_cleanup = Some(now);
1387        self.cleanup_counter += 1;
1388
1389        debug!("Cleaned up {} expired resources", cleaned);
1390        cleaned
1391    }
1392
1393    /// Clean up expired path validations
1394    fn cleanup_expired_validations(
1395        &mut self,
1396        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1397        now: Instant,
1398    ) -> u64 {
1399        let mut cleaned = 0;
1400        let validation_timeout = self.config.validation_timeout;
1401        active_validations.retain(|_addr, validation| {
1402            let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1403            if is_expired {
1404                cleaned += 1;
1405                trace!("Cleaned up expired validation for {:?}", _addr);
1406            }
1407            !is_expired
1408        });
1409
1410        cleaned
1411    }
1412
1413    /// Clean up stale candidates
1414    fn cleanup_stale_candidates(
1415        &mut self,
1416        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1417        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1418        now: Instant,
1419    ) -> u64 {
1420        let mut cleaned = 0;
1421        let candidate_timeout = self.config.candidate_timeout;
1422        // Clean up local candidates
1423        local_candidates.retain(|_seq, candidate| {
1424            let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1425                || candidate.state == CandidateState::Failed
1426                || candidate.state == CandidateState::Removed;
1427            if is_stale {
1428                cleaned += 1;
1429                trace!("Cleaned up stale local candidate {:?}", candidate.address);
1430            }
1431            !is_stale
1432        });
1433
1434        // Clean up remote candidates
1435        remote_candidates.retain(|_seq, candidate| {
1436            let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1437                || candidate.state == CandidateState::Failed
1438                || candidate.state == CandidateState::Removed;
1439            if is_stale {
1440                cleaned += 1;
1441                trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1442            }
1443            !is_stale
1444        });
1445
1446        cleaned
1447    }
1448
1449    /// Clean up failed candidate pairs
1450    fn cleanup_failed_pairs(
1451        &mut self,
1452        candidate_pairs: &mut Vec<CandidatePair>,
1453        now: Instant,
1454    ) -> u64 {
1455        let mut cleaned = 0;
1456        let pair_timeout = self.config.candidate_timeout;
1457        candidate_pairs.retain(|pair| {
1458            let is_stale = now.duration_since(pair.created_at) > pair_timeout
1459                || pair.state == PairState::Failed;
1460            if is_stale {
1461                cleaned += 1;
1462                trace!(
1463                    "Cleaned up failed candidate pair {:?} -> {:?}",
1464                    pair.local_addr, pair.remote_addr
1465                );
1466            }
1467            !is_stale
1468        });
1469
1470        cleaned
1471    }
1472
1473    /// Clean up old coordination state
1474    fn cleanup_old_coordination(
1475        &mut self,
1476        coordination: &mut Option<CoordinationState>,
1477        now: Instant,
1478    ) -> u64 {
1479        let mut cleaned = 0;
1480        if let Some(coord) = coordination {
1481            let is_expired =
1482                now.duration_since(coord.round_start) > self.config.coordination_timeout;
1483            let is_failed = coord.state == CoordinationPhase::Failed;
1484
1485            if is_expired || is_failed {
1486                let round = coord.round;
1487                *coordination = None;
1488                cleaned += 1;
1489                trace!("Cleaned up old coordination state for round {}", round);
1490            }
1491        }
1492
1493        cleaned
1494    }
1495
1496    /// Perform aggressive cleanup when under memory pressure
1497    fn aggressive_cleanup(
1498        &mut self,
1499        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1500        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1501        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1502        candidate_pairs: &mut Vec<CandidatePair>,
1503        now: Instant,
1504    ) -> u64 {
1505        let mut cleaned = 0;
1506        // More aggressive timeout for candidates
1507        let aggressive_timeout = self.config.candidate_timeout / 2;
1508
1509        // Clean up older candidates first
1510        local_candidates.retain(|_seq, candidate| {
1511            let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1512                && candidate.state != CandidateState::Failed;
1513            if !keep {
1514                cleaned += 1;
1515            }
1516            keep
1517        });
1518
1519        remote_candidates.retain(|_seq, candidate| {
1520            let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1521                && candidate.state != CandidateState::Failed;
1522            if !keep {
1523                cleaned += 1;
1524            }
1525            keep
1526        });
1527
1528        // Clean up waiting candidate pairs
1529        candidate_pairs.retain(|pair| {
1530            let keep = pair.state != PairState::Waiting
1531                || now.duration_since(pair.created_at) <= aggressive_timeout;
1532            if !keep {
1533                cleaned += 1;
1534            }
1535            keep
1536        });
1537
1538        // Clean up old validations more aggressively
1539        active_validations.retain(|_addr, validation| {
1540            let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1541            if !keep {
1542                cleaned += 1;
1543            }
1544            keep
1545        });
1546
1547        warn!(
1548            "Aggressive cleanup removed {} resources due to memory pressure",
1549            cleaned
1550        );
1551        cleaned
1552    }
1553
1554    /// Request graceful shutdown and cleanup
1555    fn request_shutdown(&mut self) {
1556        self.shutdown_requested = true;
1557        debug!("Resource cleanup coordinator shutdown requested");
1558    }
1559    /// Perform final cleanup during shutdown
1560    fn shutdown_cleanup(
1561        &mut self,
1562        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1563        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1564        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1565        candidate_pairs: &mut Vec<CandidatePair>,
1566        coordination: &mut Option<CoordinationState>,
1567    ) -> u64 {
1568        let mut cleaned = 0;
1569        // Clear all resources
1570        cleaned += active_validations.len() as u64;
1571        active_validations.clear();
1572
1573        cleaned += local_candidates.len() as u64;
1574        local_candidates.clear();
1575
1576        cleaned += remote_candidates.len() as u64;
1577        remote_candidates.clear();
1578
1579        cleaned += candidate_pairs.len() as u64;
1580        candidate_pairs.clear();
1581
1582        if coordination.is_some() {
1583            *coordination = None;
1584            cleaned += 1;
1585        }
1586
1587        info!("Shutdown cleanup removed {} resources", cleaned);
1588        cleaned
1589    }
1590
1591    /// Get current resource usage statistics
1592    fn get_resource_stats(&self) -> &ResourceStats {
1593        &self.stats
1594    }
1595    /// Update resource usage statistics
1596    fn update_stats(
1597        &mut self,
1598        active_validations_len: usize,
1599        local_candidates_len: usize,
1600        remote_candidates_len: usize,
1601        candidate_pairs_len: usize,
1602    ) {
1603        self.stats.active_validations = active_validations_len;
1604        self.stats.local_candidates = local_candidates_len;
1605        self.stats.remote_candidates = remote_candidates_len;
1606        self.stats.candidate_pairs = candidate_pairs_len;
1607        // Update peak memory usage
1608        let current_usage = self.stats.active_validations
1609            + self.stats.local_candidates
1610            + self.stats.remote_candidates
1611            + self.stats.candidate_pairs;
1612
1613        if current_usage > self.stats.peak_memory_usage {
1614            self.stats.peak_memory_usage = current_usage;
1615        }
1616    }
1617
1618    /// Perform resource cleanup based on current state
1619    pub(super) fn perform_cleanup(&mut self, now: Instant) {
1620        self.last_cleanup = Some(now);
1621        self.cleanup_counter += 1;
1622        // Update cleanup statistics
1623        self.stats.cleanup_operations += 1;
1624
1625        debug!("Performed resource cleanup #{}", self.cleanup_counter);
1626    }
1627}
1628
1629#[allow(dead_code)]
1630impl NetworkConditionMonitor {
1631    /// Create new network condition monitor
1632    fn new() -> Self {
1633        Self {
1634            rtt_samples: VecDeque::new(),
1635            max_samples: 20,
1636            packet_loss_rate: 0.0,
1637            congestion_window: 10,
1638            quality_score: 0.8, // Start with good quality assumption
1639            last_quality_update: Instant::now(),
1640            quality_update_interval: Duration::from_secs(10),
1641            timeout_stats: TimeoutStatistics::default(),
1642        }
1643    }
1644    /// Record a successful response time
1645    fn record_success(&mut self, rtt: Duration, now: Instant) {
1646        // Add RTT sample
1647        self.rtt_samples.push_back(rtt);
1648        if self.rtt_samples.len() > self.max_samples {
1649            self.rtt_samples.pop_front();
1650        }
1651        // Update timeout statistics
1652        self.timeout_stats.total_responses += 1;
1653        self.update_timeout_stats(now);
1654
1655        // Update quality score
1656        self.update_quality_score(now);
1657    }
1658
1659    /// Record a timeout event
1660    fn record_timeout(&mut self, now: Instant) {
1661        self.timeout_stats.total_timeouts += 1;
1662        self.update_timeout_stats(now);
1663        // Update quality score
1664        self.update_quality_score(now);
1665    }
1666
1667    /// Update timeout statistics
1668    fn update_timeout_stats(&mut self, now: Instant) {
1669        let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1670        if total_attempts > 0 {
1671            self.timeout_stats.timeout_rate =
1672                self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1673        }
1674
1675        // Calculate average response time
1676        if !self.rtt_samples.is_empty() {
1677            let total_rtt: Duration = self.rtt_samples.iter().sum();
1678            self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1679        }
1680
1681        self.timeout_stats.last_update = Some(now);
1682    }
1683
1684    /// Update network quality score
1685    fn update_quality_score(&mut self, now: Instant) {
1686        if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1687            return;
1688        }
1689        // Quality factors
1690        let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1691        let rtt_factor = self.calculate_rtt_factor();
1692        let consistency_factor = self.calculate_consistency_factor();
1693
1694        // Weighted quality score
1695        let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1696
1697        // Smooth the quality score
1698        self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1699        self.last_quality_update = now;
1700    }
1701
1702    /// Calculate RTT factor for quality score
1703    fn calculate_rtt_factor(&self) -> f64 {
1704        if self.rtt_samples.is_empty() {
1705            return 0.5; // Neutral score
1706        }
1707        let avg_rtt = self.timeout_stats.avg_response_time;
1708
1709        // Good RTT: < 50ms = 1.0, Poor RTT: > 1000ms = 0.0
1710        let rtt_ms = avg_rtt.as_millis() as f64;
1711        let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1712        factor.clamp(0.0, 1.0)
1713    }
1714
1715    /// Calculate consistency factor for quality score
1716    fn calculate_consistency_factor(&self) -> f64 {
1717        if self.rtt_samples.len() < 3 {
1718            return 0.5; // Neutral score
1719        }
1720        // Calculate RTT variance
1721        let mean_rtt = self.timeout_stats.avg_response_time;
1722        let variance: f64 = self
1723            .rtt_samples
1724            .iter()
1725            .map(|rtt| {
1726                let diff = (*rtt).abs_diff(mean_rtt);
1727                diff.as_millis() as f64
1728            })
1729            .map(|diff| diff * diff)
1730            .sum::<f64>()
1731            / self.rtt_samples.len() as f64;
1732
1733        let std_dev = variance.sqrt();
1734
1735        // Low variance = high consistency
1736        let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1737        consistency.clamp(0.0, 1.0)
1738    }
1739
1740    /// Get current network quality score
1741    fn get_quality_score(&self) -> f64 {
1742        self.quality_score
1743    }
1744    /// Get estimated RTT based on recent samples
1745    fn get_estimated_rtt(&self) -> Option<Duration> {
1746        if self.rtt_samples.is_empty() {
1747            return None;
1748        }
1749        Some(self.timeout_stats.avg_response_time)
1750    }
1751
1752    /// Check if network conditions are suitable for coordination
1753    fn is_suitable_for_coordination(&self) -> bool {
1754        // Require reasonable quality for coordination attempts
1755        self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1756    }
1757    /// Get estimated packet loss rate
1758    fn get_packet_loss_rate(&self) -> f64 {
1759        self.packet_loss_rate
1760    }
1761
1762    /// Get recommended timeout multiplier based on conditions
1763    fn get_timeout_multiplier(&self) -> f64 {
1764        let base_multiplier = 1.0;
1765
1766        // Adjust based on quality score
1767        let quality_multiplier = if self.quality_score < 0.3 {
1768            2.0 // Poor quality, increase timeouts
1769        } else if self.quality_score > 0.8 {
1770            0.8 // Good quality, reduce timeouts
1771        } else {
1772            1.0 // Neutral
1773        };
1774
1775        // Adjust based on packet loss
1776        let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1777
1778        base_multiplier * quality_multiplier * loss_multiplier
1779    }
1780
1781    /// Clean up old samples and statistics
1782    fn cleanup(&mut self, now: Instant) {
1783        // Remove old RTT samples (keep only recent ones)
1784        let _cutoff_time = now - Duration::from_secs(60);
1785
1786        // Reset statistics if they're too old
1787        if let Some(last_update) = self.timeout_stats.last_update {
1788            if now.duration_since(last_update) > Duration::from_secs(300) {
1789                self.timeout_stats = TimeoutStatistics::default();
1790            }
1791        }
1792    }
1793}
1794
1795#[allow(dead_code)]
1796impl NatTraversalState {
1797    /// Create new NAT traversal state with given configuration
1798    ///
1799    /// v0.13.0: Role parameter removed - all nodes are symmetric P2P nodes.
1800    /// Every node can initiate, accept, and coordinate NAT traversal.
1801    pub(super) fn new(max_candidates: u32, coordination_timeout: Duration) -> Self {
1802        // v0.13.0: All nodes can coordinate - always create coordinator
1803        let bootstrap_coordinator = Some(BootstrapCoordinator::new(BootstrapConfig::default()));
1804
1805        Self {
1806            // v0.13.0: role field removed - all nodes are symmetric
1807            local_candidates: HashMap::new(),
1808            remote_candidates: HashMap::new(),
1809            candidate_pairs: Vec::new(),
1810            pair_index: HashMap::new(),
1811            active_validations: HashMap::new(),
1812            coordination: None,
1813            next_sequence: VarInt::from_u32(1),
1814            max_candidates,
1815            coordination_timeout,
1816            stats: NatTraversalStats::default(),
1817            security_state: SecurityValidationState::new(),
1818            network_monitor: NetworkConditionMonitor::new(),
1819            resource_manager: ResourceCleanupCoordinator::new(),
1820            bootstrap_coordinator,
1821        }
1822    }
1823
1824    /// Add a remote candidate from AddAddress frame with security validation
1825    pub(super) fn add_remote_candidate(
1826        &mut self,
1827        sequence: VarInt,
1828        address: SocketAddr,
1829        priority: VarInt,
1830        now: Instant,
1831    ) -> Result<(), NatTraversalError> {
1832        // Resource management: Check if we should reject new resources
1833        if self.should_reject_new_resources(now) {
1834            debug!(
1835                "Rejecting new candidate due to resource limits: {}",
1836                address
1837            );
1838            return Err(NatTraversalError::ResourceLimitExceeded);
1839        }
1840        // Security validation: Check rate limiting
1841        if self.security_state.is_candidate_rate_limited(now) {
1842            self.stats.rate_limit_violations += 1;
1843            debug!("Rate limit exceeded for candidate addition: {}", address);
1844            return Err(NatTraversalError::RateLimitExceeded);
1845        }
1846
1847        // Security validation: Validate address format and safety
1848        match self.security_state.validate_address(address, now) {
1849            AddressValidationResult::Invalid => {
1850                self.stats.invalid_address_rejections += 1;
1851                self.stats.security_rejections += 1;
1852                debug!("Invalid address rejected: {}", address);
1853                return Err(NatTraversalError::InvalidAddress);
1854            }
1855            AddressValidationResult::Suspicious => {
1856                self.stats.security_rejections += 1;
1857                debug!("Suspicious address rejected: {}", address);
1858                return Err(NatTraversalError::SecurityValidationFailed);
1859            }
1860            AddressValidationResult::Valid => {
1861                // Continue with normal processing
1862            }
1863        }
1864
1865        // Check candidate count limit
1866        if self.remote_candidates.len() >= self.max_candidates as usize {
1867            return Err(NatTraversalError::TooManyCandidates);
1868        }
1869
1870        // Check for duplicate addresses (different sequence, same address)
1871        if self
1872            .remote_candidates
1873            .values()
1874            .any(|c| c.address == address && c.state != CandidateState::Removed)
1875        {
1876            return Err(NatTraversalError::DuplicateAddress);
1877        }
1878
1879        let candidate = AddressCandidate {
1880            address,
1881            priority: priority.into_inner() as u32,
1882            source: CandidateSource::Peer,
1883            discovered_at: now,
1884            state: CandidateState::New,
1885            attempt_count: 0,
1886            last_attempt: None,
1887        };
1888
1889        self.remote_candidates.insert(sequence, candidate);
1890        self.stats.remote_candidates_received += 1;
1891
1892        trace!(
1893            "Added remote candidate: {} with priority {}",
1894            address, priority
1895        );
1896        Ok(())
1897    }
1898
1899    /// Remove a candidate by sequence number
1900    pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1901        if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1902            candidate.state = CandidateState::Removed;
1903            // Cancel any active validation for this address
1904            self.active_validations.remove(&candidate.address);
1905            true
1906        } else {
1907            false
1908        }
1909    }
1910
1911    /// Add a local candidate that we've discovered
1912    #[allow(clippy::expect_used)]
1913    pub(super) fn add_local_candidate(
1914        &mut self,
1915        address: SocketAddr,
1916        source: CandidateSource,
1917        now: Instant,
1918    ) -> VarInt {
1919        let sequence = self.next_sequence;
1920        self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1921            .expect("sequence number overflow");
1922        // Calculate priority for this candidate
1923        let candidate_type = classify_candidate_type(source);
1924        let local_preference = self.calculate_local_preference(address);
1925        let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1926
1927        let candidate = AddressCandidate {
1928            address,
1929            priority,
1930            source,
1931            discovered_at: now,
1932            state: CandidateState::New,
1933            attempt_count: 0,
1934            last_attempt: None,
1935        };
1936
1937        self.local_candidates.insert(sequence, candidate);
1938        self.stats.local_candidates_sent += 1;
1939
1940        // Regenerate pairs when we add a new local candidate
1941        self.generate_candidate_pairs(now);
1942
1943        sequence
1944    }
1945
1946    /// Calculate local preference for address prioritization
1947    fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1948        match addr {
1949            SocketAddr::V4(v4) => {
1950                if v4.ip().is_loopback() {
1951                    0 // Lowest priority
1952                } else if v4.ip().is_private() {
1953                    65000 // High priority for local network
1954                } else {
1955                    32000 // Medium priority for public addresses
1956                }
1957            }
1958            SocketAddr::V6(v6) => {
1959                if v6.ip().is_loopback() {
1960                    0
1961                } else if v6.ip().segments()[0] == 0xfe80 {
1962                    // Link-local IPv6 check
1963                    30000 // Link-local gets medium-low priority
1964                } else {
1965                    50000 // IPv6 generally gets good priority
1966                }
1967            }
1968        }
1969    }
1970    /// Generate all possible candidate pairs from local and remote candidates
1971    pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1972        self.candidate_pairs.clear();
1973        self.pair_index.clear();
1974        // Pre-allocate capacity to avoid reallocations
1975        let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1976        self.candidate_pairs.reserve(estimated_capacity);
1977        self.pair_index.reserve(estimated_capacity);
1978
1979        // Cache compatibility checks to avoid repeated work
1980        let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
1981
1982        for local_candidate in self.local_candidates.values() {
1983            // Skip removed candidates early
1984            if local_candidate.state == CandidateState::Removed {
1985                continue;
1986            }
1987
1988            // Pre-classify local candidate type once
1989            let local_type = classify_candidate_type(local_candidate.source);
1990
1991            for (remote_seq, remote_candidate) in &self.remote_candidates {
1992                // Skip removed candidates
1993                if remote_candidate.state == CandidateState::Removed {
1994                    continue;
1995                }
1996
1997                // Check compatibility with caching
1998                let cache_key = (local_candidate.address, remote_candidate.address);
1999                let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2000                    are_candidates_compatible(local_candidate, remote_candidate)
2001                });
2002
2003                if !compatible {
2004                    continue;
2005                }
2006
2007                // Calculate combined priority
2008                let pair_priority =
2009                    calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
2010
2011                // Classify pair type (local already classified)
2012                let remote_type = classify_candidate_type(remote_candidate.source);
2013                let pair_type = classify_pair_type(local_type, remote_type);
2014
2015                let pair = CandidatePair {
2016                    remote_sequence: *remote_seq,
2017                    local_addr: local_candidate.address,
2018                    remote_addr: remote_candidate.address,
2019                    priority: pair_priority,
2020                    state: PairState::Waiting,
2021                    pair_type,
2022                    created_at: now,
2023                    last_check: None,
2024                };
2025
2026                // Store index for O(1) lookup
2027                let index = self.candidate_pairs.len();
2028                self.pair_index.insert(remote_candidate.address, index);
2029                self.candidate_pairs.push(pair);
2030            }
2031        }
2032
2033        // Sort pairs by priority (highest first) - use unstable sort for better performance
2034        self.candidate_pairs
2035            .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2036
2037        // Rebuild index after sorting since indices changed
2038        self.pair_index.clear();
2039        for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2040            self.pair_index.insert(pair.remote_addr, idx);
2041        }
2042
2043        trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2044    }
2045
2046    /// Get the highest priority pairs ready for validation
2047    pub(super) fn get_next_validation_pairs(
2048        &mut self,
2049        max_concurrent: usize,
2050    ) -> Vec<&mut CandidatePair> {
2051        // Since pairs are sorted by priority (highest first), we can stop early
2052        // once we find enough waiting pairs or reach lower priority pairs
2053        let mut result = Vec::with_capacity(max_concurrent);
2054        for pair in self.candidate_pairs.iter_mut() {
2055            if pair.state == PairState::Waiting {
2056                result.push(pair);
2057                if result.len() >= max_concurrent {
2058                    break;
2059                }
2060            }
2061        }
2062
2063        result
2064    }
2065
2066    /// Find a candidate pair by remote address
2067    pub(super) fn find_pair_by_remote_addr(
2068        &mut self,
2069        addr: SocketAddr,
2070    ) -> Option<&mut CandidatePair> {
2071        // Use index for O(1) lookup instead of O(n) linear search
2072        if let Some(&index) = self.pair_index.get(&addr) {
2073            self.candidate_pairs.get_mut(index)
2074        } else {
2075            None
2076        }
2077    }
2078    /// Mark a pair as succeeded and handle promotion
2079    pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2080        // Find the pair and get its type and priority
2081        let (succeeded_type, succeeded_priority) = {
2082            if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2083                pair.state = PairState::Succeeded;
2084                (pair.pair_type, pair.priority)
2085            } else {
2086                return false;
2087            }
2088        };
2089        // Freeze lower priority pairs of the same type to avoid unnecessary testing
2090        for other_pair in &mut self.candidate_pairs {
2091            if other_pair.pair_type == succeeded_type
2092                && other_pair.priority < succeeded_priority
2093                && other_pair.state == PairState::Waiting
2094            {
2095                other_pair.state = PairState::Frozen;
2096            }
2097        }
2098
2099        true
2100    }
2101
2102    /// Get the best succeeded pair for each address family
2103    pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2104        let mut best_ipv4: Option<&CandidatePair> = None;
2105        let mut best_ipv6: Option<&CandidatePair> = None;
2106        for pair in &self.candidate_pairs {
2107            if pair.state != PairState::Succeeded {
2108                continue;
2109            }
2110
2111            match pair.remote_addr {
2112                SocketAddr::V4(_) => {
2113                    if best_ipv4.is_none_or(|best| pair.priority > best.priority) {
2114                        best_ipv4 = Some(pair);
2115                    }
2116                }
2117                SocketAddr::V6(_) => {
2118                    if best_ipv6.is_none_or(|best| pair.priority > best.priority) {
2119                        best_ipv6 = Some(pair);
2120                    }
2121                }
2122            }
2123        }
2124
2125        let mut result = Vec::new();
2126        if let Some(pair) = best_ipv4 {
2127            result.push(pair);
2128        }
2129        if let Some(pair) = best_ipv6 {
2130            result.push(pair);
2131        }
2132        result
2133    }
2134
2135    /// Get candidates ready for validation, sorted by priority
2136    pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2137        let mut candidates: Vec<_> = self
2138            .remote_candidates
2139            .iter()
2140            .filter(|(_, c)| c.state == CandidateState::New)
2141            .map(|(k, v)| (*k, v))
2142            .collect();
2143        // Sort by priority (higher priority first)
2144        candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2145        candidates
2146    }
2147
2148    /// Start validation for a candidate address with security checks
2149    pub(super) fn start_validation(
2150        &mut self,
2151        sequence: VarInt,
2152        challenge: u64,
2153        now: Instant,
2154    ) -> Result<(), NatTraversalError> {
2155        let candidate = self
2156            .remote_candidates
2157            .get_mut(&sequence)
2158            .ok_or(NatTraversalError::UnknownCandidate)?;
2159        if candidate.state != CandidateState::New {
2160            return Err(NatTraversalError::InvalidCandidateState);
2161        }
2162
2163        // Security validation: Check for validation abuse patterns
2164        if Self::is_validation_suspicious(candidate, now) {
2165            self.stats.security_rejections += 1;
2166            debug!(
2167                "Suspicious validation attempt rejected for address {}",
2168                candidate.address
2169            );
2170            return Err(NatTraversalError::SecurityValidationFailed);
2171        }
2172
2173        // Security validation: Limit concurrent validations
2174        if self.active_validations.len() >= 10 {
2175            debug!(
2176                "Too many concurrent validations, rejecting new validation for {}",
2177                candidate.address
2178            );
2179            return Err(NatTraversalError::SecurityValidationFailed);
2180        }
2181
2182        // Update candidate state
2183        candidate.state = CandidateState::Validating;
2184        candidate.attempt_count += 1;
2185        candidate.last_attempt = Some(now);
2186
2187        // Track validation state
2188        let validation = PathValidationState {
2189            challenge,
2190            sent_at: now,
2191            retry_count: 0,
2192            max_retries: 3, // TODO: Make configurable
2193            coordination_round: self.coordination.as_ref().map(|c| c.round),
2194            timeout_state: AdaptiveTimeoutState::new(),
2195            last_retry_at: None,
2196        };
2197
2198        self.active_validations
2199            .insert(candidate.address, validation);
2200        trace!(
2201            "Started validation for candidate {} with challenge {}",
2202            candidate.address, challenge
2203        );
2204        Ok(())
2205    }
2206
2207    /// Check if a validation request shows suspicious patterns
2208    fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2209        // Check for excessive retry attempts
2210        if candidate.attempt_count > 10 {
2211            return true;
2212        }
2213        // Check for rapid retry patterns
2214        if let Some(last_attempt) = candidate.last_attempt {
2215            let time_since_last = now.duration_since(last_attempt);
2216            if time_since_last < Duration::from_millis(100) {
2217                return true; // Too frequent attempts
2218            }
2219        }
2220
2221        // Check if this candidate was recently failed
2222        if candidate.state == CandidateState::Failed {
2223            let time_since_discovery = now.duration_since(candidate.discovered_at);
2224            if time_since_discovery < Duration::from_secs(60) {
2225                return true; // Recently failed, shouldn't retry so soon
2226            }
2227        }
2228
2229        false
2230    }
2231
2232    /// Handle successful validation response
2233    pub(super) fn handle_validation_success(
2234        &mut self,
2235        remote_addr: SocketAddr,
2236        challenge: u64,
2237        now: Instant,
2238    ) -> Result<VarInt, NatTraversalError> {
2239        // Find the candidate with this address
2240        let sequence = self
2241            .remote_candidates
2242            .iter()
2243            .find(|(_, c)| c.address == remote_addr)
2244            .map(|(seq, _)| *seq)
2245            .ok_or(NatTraversalError::UnknownCandidate)?;
2246        // Verify challenge matches and update timeout state
2247        let validation = self
2248            .active_validations
2249            .get_mut(&remote_addr)
2250            .ok_or(NatTraversalError::NoActiveValidation)?;
2251
2252        if validation.challenge != challenge {
2253            return Err(NatTraversalError::ChallengeMismatch);
2254        }
2255
2256        // Calculate RTT and update adaptive timeout
2257        let rtt = now.duration_since(validation.sent_at);
2258        validation.timeout_state.update_success(rtt);
2259
2260        // Update network monitor
2261        self.network_monitor.record_success(rtt, now);
2262
2263        // Update candidate state
2264        let candidate = self
2265            .remote_candidates
2266            .get_mut(&sequence)
2267            .ok_or(NatTraversalError::UnknownCandidate)?;
2268
2269        candidate.state = CandidateState::Valid;
2270        self.active_validations.remove(&remote_addr);
2271        self.stats.validations_succeeded += 1;
2272
2273        trace!(
2274            "Validation successful for {} with RTT {:?}",
2275            remote_addr, rtt
2276        );
2277        Ok(sequence)
2278    }
2279
2280    /// Start a new coordination round for simultaneous hole punching with security validation
2281    pub(super) fn start_coordination_round(
2282        &mut self,
2283        targets: Vec<PunchTarget>,
2284        now: Instant,
2285    ) -> Result<VarInt, NatTraversalError> {
2286        // Security validation: Check rate limiting for coordination requests
2287        if self.security_state.is_coordination_rate_limited(now) {
2288            self.stats.rate_limit_violations += 1;
2289            debug!(
2290                "Rate limit exceeded for coordination request with {} targets",
2291                targets.len()
2292            );
2293            return Err(NatTraversalError::RateLimitExceeded);
2294        }
2295        // Security validation: Check for suspicious coordination patterns
2296        if self.is_coordination_suspicious(&targets, now) {
2297            self.stats.suspicious_coordination_attempts += 1;
2298            self.stats.security_rejections += 1;
2299            debug!(
2300                "Suspicious coordination request rejected with {} targets",
2301                targets.len()
2302            );
2303            return Err(NatTraversalError::SuspiciousCoordination);
2304        }
2305
2306        // Security validation: Validate all target addresses
2307        for target in &targets {
2308            match self
2309                .security_state
2310                .validate_address(target.remote_addr, now)
2311            {
2312                AddressValidationResult::Invalid => {
2313                    self.stats.invalid_address_rejections += 1;
2314                    self.stats.security_rejections += 1;
2315                    debug!(
2316                        "Invalid target address in coordination: {}",
2317                        target.remote_addr
2318                    );
2319                    return Err(NatTraversalError::InvalidAddress);
2320                }
2321                AddressValidationResult::Suspicious => {
2322                    self.stats.security_rejections += 1;
2323                    debug!(
2324                        "Suspicious target address in coordination: {}",
2325                        target.remote_addr
2326                    );
2327                    return Err(NatTraversalError::SecurityValidationFailed);
2328                }
2329                AddressValidationResult::Valid => {
2330                    // Continue with normal processing
2331                }
2332            }
2333        }
2334
2335        let round = self.next_sequence;
2336        self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2337            .expect("sequence number overflow");
2338
2339        // Calculate synchronized punch time (grace period for coordination)
2340        let coordination_grace = Duration::from_millis(500); // 500ms for coordination
2341        let punch_start = now + coordination_grace;
2342
2343        self.coordination = Some(CoordinationState {
2344            round,
2345            punch_targets: targets,
2346            round_start: now,
2347            punch_start,
2348            round_duration: self.coordination_timeout,
2349            state: CoordinationPhase::Requesting,
2350            punch_request_sent: false,
2351            peer_punch_received: false,
2352            retry_count: 0,
2353            max_retries: 3,
2354            timeout_state: AdaptiveTimeoutState::new(),
2355            last_retry_at: None,
2356        });
2357
2358        self.stats.coordination_rounds += 1;
2359        trace!(
2360            "Started coordination round {} with {} targets",
2361            round,
2362            self.coordination
2363                .as_ref()
2364                .map(|c| c.punch_targets.len())
2365                .unwrap_or(0)
2366        );
2367        Ok(round)
2368    }
2369
2370    /// Check if a coordination request shows suspicious patterns
2371    fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2372        // Check for excessive number of targets
2373        if targets.len() > 20 {
2374            return true;
2375        }
2376        // Check for duplicate targets
2377        let mut seen_addresses = std::collections::HashSet::new();
2378        for target in targets {
2379            if !seen_addresses.insert(target.remote_addr) {
2380                return true; // Duplicate target
2381            }
2382        }
2383
2384        // Check for patterns that might indicate scanning
2385        if targets.len() > 5 {
2386            // Check if all targets are in sequential IP ranges (potential scan)
2387            let mut ipv4_addresses: Vec<_> = targets
2388                .iter()
2389                .filter_map(|t| match t.remote_addr.ip() {
2390                    IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2391                    _ => None,
2392                })
2393                .collect();
2394
2395            if ipv4_addresses.len() >= 3 {
2396                ipv4_addresses.sort();
2397                let mut sequential_count = 1;
2398                for i in 1..ipv4_addresses.len() {
2399                    if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2400                        sequential_count += 1;
2401                        if sequential_count >= 3 {
2402                            return true; // Sequential IPs detected
2403                        }
2404                    } else {
2405                        sequential_count = 1;
2406                    }
2407                }
2408            }
2409        }
2410
2411        false
2412    }
2413
2414    /// Get the current coordination phase
2415    pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2416        self.coordination.as_ref().map(|c| c.state)
2417    }
2418    /// Check if we need to send PUNCH_ME_NOW frame
2419    pub(super) fn should_send_punch_request(&self) -> bool {
2420        if let Some(coord) = &self.coordination {
2421            coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2422        } else {
2423            false
2424        }
2425    }
2426    /// Mark that we've sent our PUNCH_ME_NOW request
2427    pub(super) fn mark_punch_request_sent(&mut self) {
2428        if let Some(coord) = &mut self.coordination {
2429            coord.punch_request_sent = true;
2430            coord.state = CoordinationPhase::Coordinating;
2431            trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2432        }
2433    }
2434    /// Handle receiving peer's PUNCH_ME_NOW (via coordinator) with security validation
2435    pub(super) fn handle_peer_punch_request(
2436        &mut self,
2437        peer_round: VarInt,
2438        now: Instant,
2439    ) -> Result<bool, NatTraversalError> {
2440        // Security validation: Check if this is a valid coordination request
2441        if self.is_peer_coordination_suspicious(peer_round, now) {
2442            self.stats.suspicious_coordination_attempts += 1;
2443            self.stats.security_rejections += 1;
2444            debug!(
2445                "Suspicious peer coordination request rejected for round {}",
2446                peer_round
2447            );
2448            return Err(NatTraversalError::SuspiciousCoordination);
2449        }
2450        if let Some(coord) = &mut self.coordination {
2451            if coord.round == peer_round {
2452                match coord.state {
2453                    CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2454                        coord.peer_punch_received = true;
2455                        coord.state = CoordinationPhase::Preparing;
2456
2457                        // Calculate adaptive grace period based on network conditions
2458                        let network_rtt = self
2459                            .network_monitor
2460                            .get_estimated_rtt()
2461                            .unwrap_or(Duration::from_millis(100));
2462                        let quality_score = self.network_monitor.get_quality_score();
2463
2464                        // Scale grace period: good networks get shorter delays
2465                        let base_grace = Duration::from_millis(150);
2466                        let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2467                        let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2468
2469                        let adaptive_grace = Duration::from_millis(
2470                            (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2471                        );
2472
2473                        coord.punch_start = now + adaptive_grace;
2474
2475                        trace!(
2476                            "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2477                            adaptive_grace, network_rtt, quality_score
2478                        );
2479                        Ok(true)
2480                    }
2481                    CoordinationPhase::Preparing => {
2482                        // Already in preparation phase, just acknowledge
2483                        trace!("Peer coordination confirmed during preparation");
2484                        Ok(true)
2485                    }
2486                    _ => {
2487                        debug!(
2488                            "Received coordination in unexpected phase: {:?}",
2489                            coord.state
2490                        );
2491                        Ok(false)
2492                    }
2493                }
2494            } else {
2495                debug!(
2496                    "Received coordination for wrong round: {} vs {}",
2497                    peer_round, coord.round
2498                );
2499                Ok(false)
2500            }
2501        } else {
2502            debug!("Received peer coordination but no active round");
2503            Ok(false)
2504        }
2505    }
2506
2507    /// Check if a peer coordination request is suspicious
2508    fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2509        // Check for round number anomalies
2510        if peer_round.into_inner() == 0 {
2511            return true; // Invalid round number
2512        }
2513        // Check if round is too far in the future or past
2514        if let Some(coord) = &self.coordination {
2515            let our_round = coord.round.into_inner();
2516            let peer_round_num = peer_round.into_inner();
2517
2518            // Allow some variance but reject extreme differences
2519            if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2520                return true;
2521            }
2522        }
2523
2524        false
2525    }
2526
2527    /// Check if it's time to start hole punching
2528    pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2529        if let Some(coord) = &self.coordination {
2530            match coord.state {
2531                CoordinationPhase::Preparing => now >= coord.punch_start,
2532                CoordinationPhase::Coordinating => {
2533                    // Check if we have peer confirmation and grace period elapsed
2534                    coord.peer_punch_received && now >= coord.punch_start
2535                }
2536                _ => false,
2537            }
2538        } else {
2539            false
2540        }
2541    }
2542    /// Start the synchronized hole punching phase
2543    pub(super) fn start_punching_phase(&mut self, now: Instant) {
2544        if let Some(coord) = &mut self.coordination {
2545            coord.state = CoordinationPhase::Punching;
2546            // Calculate precise timing for coordinated transmission
2547            let network_rtt = self
2548                .network_monitor
2549                .get_estimated_rtt()
2550                .unwrap_or(Duration::from_millis(100));
2551
2552            // Add small random jitter to avoid thundering herd
2553            let jitter_ms: u64 = rand::random::<u64>() % 11;
2554            let jitter = Duration::from_millis(jitter_ms);
2555            let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2556
2557            // Update punch start time with precise calculation
2558            coord.punch_start = transmission_time.max(now);
2559
2560            trace!(
2561                "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2562                coord.punch_start, network_rtt, jitter
2563            );
2564        }
2565    }
2566
2567    /// Get punch targets for the current round
2568    pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2569        self.coordination
2570            .as_ref()
2571            .map(|c| c.punch_targets.as_slice())
2572    }
2573    /// Mark coordination as validating (PATH_CHALLENGE sent)
2574    pub(super) fn mark_coordination_validating(&mut self) {
2575        if let Some(coord) = &mut self.coordination {
2576            if coord.state == CoordinationPhase::Punching {
2577                coord.state = CoordinationPhase::Validating;
2578                trace!("Coordination moved to validation phase");
2579            }
2580        }
2581    }
2582    /// Handle successful path validation during coordination
2583    pub(super) fn handle_coordination_success(
2584        &mut self,
2585        remote_addr: SocketAddr,
2586        now: Instant,
2587    ) -> bool {
2588        if let Some(coord) = &mut self.coordination {
2589            // Check if this address was one of our punch targets
2590            let was_target = coord
2591                .punch_targets
2592                .iter()
2593                .any(|target| target.remote_addr == remote_addr);
2594            if was_target && coord.state == CoordinationPhase::Validating {
2595                // Calculate RTT and update adaptive timeout
2596                let rtt = now.duration_since(coord.round_start);
2597                coord.timeout_state.update_success(rtt);
2598                self.network_monitor.record_success(rtt, now);
2599
2600                coord.state = CoordinationPhase::Succeeded;
2601                self.stats.direct_connections += 1;
2602                trace!(
2603                    "Coordination succeeded via {} with RTT {:?}",
2604                    remote_addr, rtt
2605                );
2606                true
2607            } else {
2608                false
2609            }
2610        } else {
2611            false
2612        }
2613    }
2614
2615    /// Handle coordination failure and determine if we should retry
2616    pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2617        if let Some(coord) = &mut self.coordination {
2618            coord.retry_count += 1;
2619            coord.timeout_state.update_timeout();
2620            self.network_monitor.record_timeout(now);
2621            // Check network conditions before retrying
2622            if coord.timeout_state.should_retry(coord.max_retries)
2623                && self.network_monitor.is_suitable_for_coordination()
2624            {
2625                // Retry with adaptive timeout
2626                coord.state = CoordinationPhase::Requesting;
2627                coord.punch_request_sent = false;
2628                coord.peer_punch_received = false;
2629                coord.round_start = now;
2630                coord.last_retry_at = Some(now);
2631
2632                // Use adaptive timeout for retry delay
2633                let retry_delay = coord.timeout_state.get_retry_delay();
2634
2635                // Factor in network quality for retry timing
2636                let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2637                let adjusted_delay = Duration::from_millis(
2638                    (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2639                );
2640
2641                coord.punch_start = now + adjusted_delay;
2642
2643                trace!(
2644                    "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2645                    coord.round,
2646                    coord.retry_count + 1,
2647                    adjusted_delay,
2648                    self.network_monitor.get_quality_score()
2649                );
2650                true
2651            } else {
2652                coord.state = CoordinationPhase::Failed;
2653                self.stats.coordination_failures += 1;
2654
2655                if !self.network_monitor.is_suitable_for_coordination() {
2656                    trace!(
2657                        "Coordination failed due to poor network conditions (quality: {:.2})",
2658                        self.network_monitor.get_quality_score()
2659                    );
2660                } else {
2661                    trace!("Coordination failed after {} attempts", coord.retry_count);
2662                }
2663                false
2664            }
2665        } else {
2666            false
2667        }
2668    }
2669
2670    /// Check if the current coordination round has timed out
2671    pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2672        if let Some(coord) = &mut self.coordination {
2673            let timeout = coord.timeout_state.get_timeout();
2674            let elapsed = now.duration_since(coord.round_start);
2675            if elapsed > timeout {
2676                trace!(
2677                    "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2678                    coord.round, elapsed, timeout
2679                );
2680                self.handle_coordination_failure(now);
2681                true
2682            } else {
2683                false
2684            }
2685        } else {
2686            false
2687        }
2688    }
2689
2690    /// Check for validation timeouts and handle retries
2691    pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2692        let mut expired_validations = Vec::new();
2693        let mut retry_validations = Vec::new();
2694
2695        for (addr, validation) in &mut self.active_validations {
2696            let timeout = validation.timeout_state.get_timeout();
2697            let elapsed = now.duration_since(validation.sent_at);
2698
2699            if elapsed >= timeout {
2700                if validation
2701                    .timeout_state
2702                    .should_retry(validation.max_retries)
2703                {
2704                    // Schedule retry
2705                    retry_validations.push(*addr);
2706                } else {
2707                    // Mark as expired
2708                    expired_validations.push(*addr);
2709                }
2710            }
2711        }
2712
2713        // Handle retries
2714        for addr in retry_validations {
2715            if let Some(validation) = self.active_validations.get_mut(&addr) {
2716                validation.retry_count += 1;
2717                validation.sent_at = now;
2718                validation.last_retry_at = Some(now);
2719                validation.timeout_state.update_timeout();
2720
2721                trace!(
2722                    "Retrying validation for {} (attempt {})",
2723                    addr,
2724                    validation.retry_count + 1
2725                );
2726            }
2727        }
2728
2729        // Remove expired validations
2730        for addr in &expired_validations {
2731            self.active_validations.remove(addr);
2732            self.network_monitor.record_timeout(now);
2733            trace!("Validation expired for {}", addr);
2734        }
2735
2736        expired_validations
2737    }
2738
2739    /// Schedule validation retries for active validations that need retry
2740    pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2741        let mut retry_addresses = Vec::new();
2742
2743        // Get all active validations that need retry
2744        for (addr, validation) in &mut self.active_validations {
2745            let elapsed = now.duration_since(validation.sent_at);
2746            let timeout = validation.timeout_state.get_timeout();
2747
2748            if elapsed > timeout
2749                && validation
2750                    .timeout_state
2751                    .should_retry(validation.max_retries)
2752            {
2753                // Update retry state
2754                validation.retry_count += 1;
2755                validation.last_retry_at = Some(now);
2756                validation.sent_at = now; // Reset sent time for new attempt
2757                validation.timeout_state.update_timeout();
2758
2759                retry_addresses.push(*addr);
2760                trace!(
2761                    "Scheduled retry {} for validation to {}",
2762                    validation.retry_count, addr
2763                );
2764            }
2765        }
2766
2767        retry_addresses
2768    }
2769
2770    /// Update network conditions and cleanup
2771    pub(super) fn update_network_conditions(&mut self, now: Instant) {
2772        self.network_monitor.cleanup(now);
2773
2774        // Update timeout multiplier based on network conditions
2775        let multiplier = self.network_monitor.get_timeout_multiplier();
2776
2777        // Apply network-aware timeout adjustments to active validations
2778        for validation in self.active_validations.values_mut() {
2779            if multiplier > 1.5 {
2780                // Poor network conditions - be more patient
2781                validation.timeout_state.backoff_multiplier =
2782                    (validation.timeout_state.backoff_multiplier * 1.2)
2783                        .min(validation.timeout_state.max_backoff_multiplier);
2784            } else if multiplier < 0.8 {
2785                // Good network conditions - be more aggressive
2786                validation.timeout_state.backoff_multiplier =
2787                    (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2788            }
2789        }
2790    }
2791
2792    /// Check if coordination should be retried now
2793    pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2794        if let Some(coord) = &self.coordination {
2795            if coord.retry_count > 0 {
2796                if let Some(last_retry) = coord.last_retry_at {
2797                    let retry_delay = coord.timeout_state.get_retry_delay();
2798                    return now.duration_since(last_retry) >= retry_delay;
2799                }
2800            }
2801        }
2802        false
2803    }
2804
2805    /// Perform resource management and cleanup
2806    pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2807        // Update resource usage statistics
2808        self.resource_manager.update_stats(
2809            self.active_validations.len(),
2810            self.local_candidates.len(),
2811            self.remote_candidates.len(),
2812            self.candidate_pairs.len(),
2813        );
2814
2815        // Calculate current memory pressure
2816        let memory_pressure = self.resource_manager.calculate_memory_pressure(
2817            self.active_validations.len(),
2818            self.local_candidates.len(),
2819            self.remote_candidates.len(),
2820            self.candidate_pairs.len(),
2821        );
2822
2823        // Perform cleanup if needed
2824        let mut cleaned = 0;
2825
2826        if self.resource_manager.should_cleanup(now) {
2827            cleaned += self.resource_manager.cleanup_expired_resources(
2828                &mut self.active_validations,
2829                &mut self.local_candidates,
2830                &mut self.remote_candidates,
2831                &mut self.candidate_pairs,
2832                &mut self.coordination,
2833                now,
2834            );
2835
2836            // If memory pressure is high, perform aggressive cleanup
2837            if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2838                cleaned += self.resource_manager.aggressive_cleanup(
2839                    &mut self.active_validations,
2840                    &mut self.local_candidates,
2841                    &mut self.remote_candidates,
2842                    &mut self.candidate_pairs,
2843                    now,
2844                );
2845            }
2846        }
2847
2848        cleaned
2849    }
2850
2851    /// Check if we should reject new resources due to limits
2852    pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2853        // Update stats and check limits
2854        self.resource_manager.update_stats(
2855            self.active_validations.len(),
2856            self.local_candidates.len(),
2857            self.remote_candidates.len(),
2858            self.candidate_pairs.len(),
2859        );
2860        let memory_pressure = self.resource_manager.calculate_memory_pressure(
2861            self.active_validations.len(),
2862            self.local_candidates.len(),
2863            self.remote_candidates.len(),
2864            self.candidate_pairs.len(),
2865        );
2866        // Reject if memory pressure is too high
2867        if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2868            self.resource_manager.stats.allocation_failures += 1;
2869            return true;
2870        }
2871
2872        // Reject if hard limits are exceeded
2873        if self.resource_manager.check_resource_limits(self) {
2874            self.resource_manager.stats.allocation_failures += 1;
2875            return true;
2876        }
2877
2878        false
2879    }
2880
2881    /// Get the next timeout instant for NAT traversal operations
2882    pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2883        let mut next_timeout = None;
2884        // Check coordination timeout
2885        if let Some(coord) = &self.coordination {
2886            match coord.state {
2887                CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2888                    let timeout_at = coord.round_start + self.coordination_timeout;
2889                    next_timeout =
2890                        Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2891                }
2892                CoordinationPhase::Preparing => {
2893                    // Punch start time is when we should start punching
2894                    next_timeout = Some(
2895                        next_timeout
2896                            .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
2897                    );
2898                }
2899                CoordinationPhase::Punching | CoordinationPhase::Validating => {
2900                    // Check for coordination round timeout
2901                    let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2902                    next_timeout =
2903                        Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2904                }
2905                _ => {}
2906            }
2907        }
2908
2909        // Check validation timeouts
2910        for validation in self.active_validations.values() {
2911            let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2912            next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2913        }
2914
2915        // Check resource cleanup interval
2916        if self.resource_manager.should_cleanup(now) {
2917            // Schedule cleanup soon
2918            let cleanup_at = now + Duration::from_secs(1);
2919            next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2920        }
2921
2922        next_timeout
2923    }
2924
2925    /// Handle timeout events and return actions to take
2926    pub(super) fn handle_timeout(
2927        &mut self,
2928        now: Instant,
2929    ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2930        let mut actions = Vec::new();
2931        // Handle coordination timeouts
2932        if let Some(coord) = &mut self.coordination {
2933            match coord.state {
2934                CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2935                    let timeout_at = coord.round_start + self.coordination_timeout;
2936                    if now >= timeout_at {
2937                        coord.retry_count += 1;
2938                        if coord.retry_count >= coord.max_retries {
2939                            debug!("Coordination failed after {} retries", coord.retry_count);
2940                            coord.state = CoordinationPhase::Failed;
2941                            actions.push(TimeoutAction::Failed);
2942                        } else {
2943                            debug!(
2944                                "Coordination timeout, retrying ({}/{})",
2945                                coord.retry_count, coord.max_retries
2946                            );
2947                            coord.state = CoordinationPhase::Requesting;
2948                            coord.round_start = now;
2949                            actions.push(TimeoutAction::RetryCoordination);
2950                        }
2951                    }
2952                }
2953                CoordinationPhase::Preparing => {
2954                    // Check if it's time to start punching
2955                    if now >= coord.punch_start {
2956                        debug!("Starting coordinated hole punching");
2957                        coord.state = CoordinationPhase::Punching;
2958                        actions.push(TimeoutAction::StartValidation);
2959                    }
2960                }
2961                CoordinationPhase::Punching | CoordinationPhase::Validating => {
2962                    let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2963                    if now >= timeout_at {
2964                        coord.retry_count += 1;
2965                        if coord.retry_count >= coord.max_retries {
2966                            debug!("Validation failed after {} retries", coord.retry_count);
2967                            coord.state = CoordinationPhase::Failed;
2968                            actions.push(TimeoutAction::Failed);
2969                        } else {
2970                            debug!(
2971                                "Validation timeout, retrying ({}/{})",
2972                                coord.retry_count, coord.max_retries
2973                            );
2974                            coord.state = CoordinationPhase::Punching;
2975                            actions.push(TimeoutAction::StartValidation);
2976                        }
2977                    }
2978                }
2979                CoordinationPhase::Succeeded => {
2980                    actions.push(TimeoutAction::Complete);
2981                }
2982                CoordinationPhase::Failed => {
2983                    actions.push(TimeoutAction::Failed);
2984                }
2985                _ => {}
2986            }
2987        }
2988
2989        // Handle validation timeouts
2990        let mut expired_validations = Vec::new();
2991        for (addr, validation) in &mut self.active_validations {
2992            let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2993            if now >= timeout_at {
2994                validation.retry_count += 1;
2995                if validation.retry_count >= validation.max_retries {
2996                    debug!("Path validation failed for {}: max retries exceeded", addr);
2997                    expired_validations.push(*addr);
2998                } else {
2999                    debug!(
3000                        "Path validation timeout for {}, retrying ({}/{})",
3001                        addr, validation.retry_count, validation.max_retries
3002                    );
3003                    validation.sent_at = now;
3004                    validation.last_retry_at = Some(now);
3005                    actions.push(TimeoutAction::StartValidation);
3006                }
3007            }
3008        }
3009
3010        // Remove expired validations
3011        for addr in expired_validations {
3012            self.active_validations.remove(&addr);
3013        }
3014
3015        // Handle resource cleanup
3016        if self.resource_manager.should_cleanup(now) {
3017            self.resource_manager.perform_cleanup(now);
3018        }
3019
3020        // Update network condition monitoring
3021        self.network_monitor.update_quality_score(now);
3022
3023        // If no coordination is active and we have candidates, try to start discovery
3024        if self.coordination.is_none()
3025            && !self.local_candidates.is_empty()
3026            && !self.remote_candidates.is_empty()
3027        {
3028            actions.push(TimeoutAction::RetryDiscovery);
3029        }
3030
3031        Ok(actions)
3032    }
3033
3034    /// Handle address observation for P2P nodes
3035    ///
3036    /// This method is called when a peer connects, allowing this node
3037    /// to observe the peer's public address. v0.13.0: All nodes can observe
3038    /// addresses - no bootstrap role required.
3039    pub(super) fn handle_address_observation(
3040        &mut self,
3041        peer_id: [u8; 32],
3042        observed_address: SocketAddr,
3043        connection_id: crate::shared::ConnectionId,
3044        now: Instant,
3045    ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3046        if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3047            let connection_context = ConnectionContext {
3048                connection_id,
3049                original_destination: observed_address, // For now, use same as observed
3050                                                        // v0.13.0: peer_role removed - all nodes are symmetric
3051            };
3052
3053            // Observe the peer's address
3054            bootstrap_coordinator.observe_peer_address(
3055                peer_id,
3056                observed_address,
3057                connection_context,
3058                now,
3059            )?;
3060
3061            // Generate ADD_ADDRESS frame to inform peer of their observed address
3062            let sequence = self.next_sequence;
3063            self.next_sequence =
3064                VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3065
3066            let priority = VarInt::from_u32(100); // Server-reflexive priority
3067            let add_address_frame =
3068                bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3069
3070            Ok(add_address_frame)
3071        } else {
3072            // Not a bootstrap node
3073            Ok(None)
3074        }
3075    }
3076
3077    /// Handle PUNCH_ME_NOW frame for bootstrap coordination
3078    ///
3079    /// This processes coordination requests from peers and facilitates
3080    /// hole punching between them.
3081    pub(super) fn handle_punch_me_now_frame(
3082        &mut self,
3083        from_peer: [u8; 32],
3084        source_addr: SocketAddr,
3085        frame: &crate::frame::PunchMeNow,
3086        now: Instant,
3087    ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3088        if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3089            bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3090        } else {
3091            // Not a bootstrap node - this frame should not be processed here
3092            Ok(None)
3093        }
3094    }
3095    /// Perform bootstrap cleanup operations
3096    ///
3097    /// Get observed address for a peer
3098    pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3099        self.bootstrap_coordinator
3100            .as_ref()
3101            .and_then(|coord| coord.peer_index.get(&peer_id).map(|p| p.observed_addr))
3102    }
3103
3104    /// Start candidate discovery process
3105    pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3106        debug!("Starting candidate discovery for NAT traversal");
3107        // Initialize discovery state if needed
3108        if self.local_candidates.is_empty() {
3109            // Add local interface candidates
3110            // This would be populated by the candidate discovery manager
3111            debug!("Local candidates will be populated by discovery manager");
3112        }
3113
3114        Ok(())
3115    }
3116
3117    /// Queue an ADD_ADDRESS frame for transmission
3118    pub(super) fn queue_add_address_frame(
3119        &mut self,
3120        sequence: VarInt,
3121        address: SocketAddr,
3122        priority: u32,
3123    ) -> Result<(), NatTraversalError> {
3124        debug!(
3125            "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3126            sequence, address, priority
3127        );
3128
3129        // Add to local candidates if not already present
3130        let candidate = AddressCandidate {
3131            address,
3132            priority,
3133            source: CandidateSource::Local,
3134            discovered_at: Instant::now(),
3135            state: CandidateState::New,
3136            attempt_count: 0,
3137            last_attempt: None,
3138        };
3139
3140        // Check if candidate already exists
3141        if !self.local_candidates.values().any(|c| c.address == address) {
3142            self.local_candidates.insert(sequence, candidate);
3143        }
3144
3145        Ok(())
3146    }
3147}
3148
3149/// Errors that can occur during NAT traversal
3150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3151#[allow(dead_code)]
3152pub(crate) enum NatTraversalError {
3153    /// Too many candidates received
3154    TooManyCandidates,
3155    /// Duplicate address for different sequence
3156    DuplicateAddress,
3157    /// Unknown candidate sequence
3158    UnknownCandidate,
3159    /// Candidate in wrong state for operation
3160    InvalidCandidateState,
3161    /// No active validation for address
3162    NoActiveValidation,
3163    /// Challenge value mismatch
3164    ChallengeMismatch,
3165    /// Coordination round not active
3166    NoActiveCoordination,
3167    /// Security validation failed
3168    SecurityValidationFailed,
3169    /// Rate limit exceeded
3170    RateLimitExceeded,
3171    /// Invalid address format
3172    InvalidAddress,
3173    /// Suspicious coordination request
3174    SuspiciousCoordination,
3175    /// Resource limit exceeded
3176    ResourceLimitExceeded,
3177}
3178impl std::fmt::Display for NatTraversalError {
3179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3180        match self {
3181            Self::TooManyCandidates => write!(f, "too many candidates"),
3182            Self::DuplicateAddress => write!(f, "duplicate address"),
3183            Self::UnknownCandidate => write!(f, "unknown candidate"),
3184            Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3185            Self::NoActiveValidation => write!(f, "no active validation"),
3186            Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3187            Self::NoActiveCoordination => write!(f, "no active coordination"),
3188            Self::SecurityValidationFailed => write!(f, "security validation failed"),
3189            Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3190            Self::InvalidAddress => write!(f, "invalid address"),
3191            Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3192            Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3193        }
3194    }
3195}
3196
3197impl std::error::Error for NatTraversalError {}
3198
3199/// Security statistics for monitoring and debugging
3200#[derive(Debug, Clone)]
3201#[allow(dead_code)]
3202pub(crate) struct SecurityStats {
3203    /// Total security rejections
3204    pub total_security_rejections: u32,
3205    /// Rate limiting violations
3206    pub rate_limit_violations: u32,
3207    /// Invalid address rejections
3208    pub invalid_address_rejections: u32,
3209    /// Suspicious coordination attempts
3210    pub suspicious_coordination_attempts: u32,
3211    /// Number of active validations
3212    pub active_validations: usize,
3213    /// Number of cached address validations
3214    pub cached_address_validations: usize,
3215    /// Current candidate addition rate
3216    pub current_candidate_rate: usize,
3217    /// Current coordination request rate
3218    pub current_coordination_rate: usize,
3219}
3220/// Bootstrap coordinator state machine for NAT traversal coordination
3221///
3222/// This manages the bootstrap node's role in observing client addresses,
3223/// coordinating hole punching, and relaying coordination messages.
3224#[derive(Debug)]
3225pub(crate) struct BootstrapCoordinator {
3226    /// Address observation cache for quick lookups
3227    address_observations: HashMap<SocketAddr, AddressObservation>,
3228    /// Quick lookup by peer id for the last observed address
3229    peer_index: HashMap<PeerId, ObservedPeer>,
3230    /// Minimal coordination table keyed by round id
3231    coordination_table: HashMap<VarInt, CoordinationEntry>,
3232    /// Security validator for coordination requests
3233    security_validator: SecurityValidationState,
3234    /// Statistics for bootstrap operations
3235    stats: BootstrapStats,
3236}
3237// Removed legacy CoordinationSessionId type
3238/// Peer identifier for bootstrap coordination
3239type PeerId = [u8; 32];
3240/// Observed peer summary (minimal index)
3241#[derive(Debug, Clone)]
3242struct ObservedPeer {
3243    observed_addr: SocketAddr,
3244}
3245
3246/// Minimal coordination record linking two peers for a round
3247#[derive(Debug, Clone)]
3248struct CoordinationEntry {
3249    peer_b: Option<PeerId>,
3250    address_hint: SocketAddr,
3251}
3252/// Record of observed peer information
3253#[derive(Debug, Clone)]
3254#[allow(dead_code)]
3255pub(crate) struct PeerObservationRecord {
3256    /// The peer's unique identifier
3257    peer_id: PeerId,
3258    /// Last observed public address
3259    observed_address: SocketAddr,
3260    /// When this observation was made
3261    observed_at: Instant,
3262    /// Connection context for this observation
3263    connection_context: ConnectionContext,
3264    /// Whether this peer can participate in coordination
3265    can_coordinate: bool,
3266    /// Number of successful coordinations
3267    coordination_count: u32,
3268    /// Average coordination success rate
3269    success_rate: f64,
3270}
3271
3272/// Connection context for address observations
3273///
3274/// v0.13.0: peer_role field removed - all nodes are symmetric P2P nodes.
3275#[derive(Debug, Clone)]
3276#[allow(dead_code)]
3277pub(crate) struct ConnectionContext {
3278    /// Connection ID for this observation
3279    connection_id: ConnectionId,
3280    /// Original destination address (what peer thought it was connecting to)
3281    original_destination: SocketAddr,
3282    // v0.13.0: peer_role field removed - all nodes are symmetric P2P nodes
3283}
3284
3285// Transport parameters for NAT traversal removed (legacy)
3286
3287/// Address observation with validation
3288#[derive(Debug, Clone)]
3289#[allow(dead_code)]
3290struct AddressObservation {
3291    /// The observed address
3292    address: SocketAddr,
3293    /// When this address was first observed
3294    first_observed: Instant,
3295    /// How many times this address has been observed
3296    observation_count: u32,
3297    /// Validation state for this address
3298    validation_state: AddressValidationResult,
3299    /// Associated peer IDs for this address
3300    associated_peers: Vec<PeerId>,
3301}
3302
3303// Removed coordination session scaffolding
3304/// Pending coordination request awaiting peer participation (stub implementation)
3305/// Configuration for bootstrap coordinator behavior (stub implementation)
3306#[derive(Debug, Clone, Default)]
3307pub(crate) struct BootstrapConfig {
3308    _unused: (),
3309}
3310/// Statistics for bootstrap operations
3311#[derive(Debug, Clone, Default)]
3312pub(crate) struct BootstrapStats {
3313    /// Total address observations made
3314    total_observations: u64,
3315    /// Total coordination sessions facilitated
3316    total_coordinations: u64,
3317    /// Successful coordinations
3318    successful_coordinations: u64,
3319    /// Security rejections
3320    security_rejections: u64,
3321}
3322// Removed session state machine enums and recovery actions
3323impl BootstrapCoordinator {
3324    /// Create a new bootstrap coordinator
3325    pub(crate) fn new(_config: BootstrapConfig) -> Self {
3326        Self {
3327            address_observations: HashMap::new(),
3328            peer_index: HashMap::new(),
3329            coordination_table: HashMap::new(),
3330            security_validator: SecurityValidationState::new(),
3331            stats: BootstrapStats::default(),
3332        }
3333    }
3334    /// Observe a peer's address from an incoming connection
3335    ///
3336    /// This is called when a peer connects to this bootstrap node,
3337    /// allowing us to observe their public address.
3338    pub(crate) fn observe_peer_address(
3339        &mut self,
3340        peer_id: PeerId,
3341        observed_address: SocketAddr,
3342        _connection_context: ConnectionContext,
3343        now: Instant,
3344    ) -> Result<(), NatTraversalError> {
3345        // Security validation
3346        match self
3347            .security_validator
3348            .validate_address(observed_address, now)
3349        {
3350            AddressValidationResult::Valid => {}
3351            AddressValidationResult::Invalid => {
3352                self.stats.security_rejections += 1;
3353                return Err(NatTraversalError::InvalidAddress);
3354            }
3355            AddressValidationResult::Suspicious => {
3356                self.stats.security_rejections += 1;
3357                return Err(NatTraversalError::SecurityValidationFailed);
3358            }
3359        }
3360
3361        // Rate limiting check
3362        if self.security_validator.is_candidate_rate_limited(now) {
3363            self.stats.security_rejections += 1;
3364            return Err(NatTraversalError::RateLimitExceeded);
3365        }
3366
3367        // Update address observation
3368        let observation = self
3369            .address_observations
3370            .entry(observed_address)
3371            .or_insert_with(|| AddressObservation {
3372                address: observed_address,
3373                first_observed: now,
3374                observation_count: 0,
3375                validation_state: AddressValidationResult::Valid,
3376                associated_peers: Vec::new(),
3377            });
3378
3379        observation.observation_count += 1;
3380        if !observation.associated_peers.contains(&peer_id) {
3381            observation.associated_peers.push(peer_id);
3382        }
3383
3384        // Update minimal peer index for quick lookups
3385        self.peer_index.insert(
3386            peer_id,
3387            ObservedPeer {
3388                observed_addr: observed_address,
3389            },
3390        );
3391
3392        // Note: Full peer registry and session scaffolding removed; we keep only minimal caches
3393        self.stats.total_observations += 1;
3394        // active_peers removed from stats
3395
3396        debug!(
3397            "Observed peer {:?} at address {} (total observations: {})",
3398            peer_id, observed_address, self.stats.total_observations
3399        );
3400
3401        Ok(())
3402    }
3403
3404    /// Generate ADD_ADDRESS frame for a peer based on observation
3405    ///
3406    /// This creates an ADD_ADDRESS frame to inform a peer of their
3407    /// observed public address.
3408    pub(crate) fn generate_add_address_frame(
3409        &self,
3410        peer_id: PeerId,
3411        sequence: VarInt,
3412        priority: VarInt,
3413    ) -> Option<crate::frame::AddAddress> {
3414        let addr = self.peer_index.get(&peer_id)?.observed_addr;
3415        Some(crate::frame::AddAddress {
3416            sequence,
3417            address: addr,
3418            priority,
3419        })
3420    }
3421
3422    /// Process a PUNCH_ME_NOW frame from a peer
3423    ///
3424    /// This handles coordination requests from peers wanting to establish
3425    /// direct connections through NAT traversal.
3426    pub(crate) fn process_punch_me_now_frame(
3427        &mut self,
3428        from_peer: PeerId,
3429        source_addr: SocketAddr,
3430        frame: &crate::frame::PunchMeNow,
3431        now: Instant,
3432    ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3433        // Enhanced security validation with adaptive rate limiting
3434        if self
3435            .security_validator
3436            .is_adaptive_rate_limited(from_peer, now)
3437        {
3438            self.stats.security_rejections += 1;
3439            debug!(
3440                "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3441                hex::encode(&from_peer[..8])
3442            );
3443            return Err(NatTraversalError::RateLimitExceeded);
3444        }
3445        // Enhanced address validation with amplification protection
3446        self.security_validator
3447            .enhanced_address_validation(frame.address, source_addr, now)
3448            .inspect_err(|&e| {
3449                self.stats.security_rejections += 1;
3450                debug!(
3451                    "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3452                    hex::encode(&from_peer[..8]),
3453                    e
3454                );
3455            })?;
3456
3457        // Comprehensive security validation
3458        self.security_validator
3459            .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3460            .inspect_err(|&e| {
3461                self.stats.security_rejections += 1;
3462                debug!(
3463                    "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3464                    hex::encode(&from_peer[..8]),
3465                    e
3466                );
3467            })?;
3468
3469        // Track coordination entry minimally
3470        let _entry = self
3471            .coordination_table
3472            .entry(frame.round)
3473            .or_insert(CoordinationEntry {
3474                peer_b: frame.target_peer_id,
3475                address_hint: frame.address,
3476            });
3477        // Update target if provided later
3478        if let Some(peer_b) = frame.target_peer_id {
3479            if _entry.peer_b.is_none() {
3480                _entry.peer_b = Some(peer_b);
3481            }
3482            _entry.address_hint = frame.address;
3483        }
3484
3485        // If we have a target, echo back with swapped target to coordinate
3486        if let Some(_target_peer_id) = frame.target_peer_id {
3487            let coordination_frame = crate::frame::PunchMeNow {
3488                round: frame.round,
3489                paired_with_sequence_number: frame.paired_with_sequence_number,
3490                address: frame.address,
3491                target_peer_id: Some(from_peer),
3492            };
3493            self.stats.total_coordinations += 1;
3494            Ok(Some(coordination_frame))
3495        } else {
3496            // Response path: increment success metric
3497            self.stats.successful_coordinations += 1;
3498            Ok(None)
3499        }
3500    }
3501
3502    // Removed legacy session tracking helpers
3503    // Generate secure coordination round using cryptographically secure random values (legacy removed)
3504
3505    // Perform comprehensive security validation for coordination requests (legacy removed)
3506
3507    #[allow(dead_code)]
3508    pub(crate) fn cleanup_expired_sessions(&mut self, _now: Instant) {}
3509
3510    // Get bootstrap statistics (legacy removed)
3511
3512    // Removed peer coordination success-rate tracking and full registry
3513
3514    #[allow(dead_code)]
3515    pub(crate) fn poll_session_state_machine(&mut self, _now: Instant) -> Vec<()> {
3516        // Legacy session state machine removed
3517        Vec::new()
3518    }
3519
3520    // Check if a session should advance its state (legacy removed)
3521    // Advance session state based on event (legacy removed)
3522
3523    #[allow(dead_code)]
3524    fn cleanup_completed_sessions(&mut self, _now: Instant) {}
3525
3526    // Legacy retry mechanism removed
3527
3528    // Handle coordination errors with appropriate recovery strategies (legacy removed)
3529
3530    #[allow(dead_code)]
3531    fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
3532        // Simple estimation based on peer record
3533        // In a real implementation, this would use historical RTT data
3534        let _ = peer_id;
3535        None
3536    }
3537    // Coordinate hole punching between two peers (legacy removed)
3538    // This method implemented the core coordination logic for establishing
3539    // direct P2P connections through NAT traversal.
3540
3541    // Relay coordination frame between peers (legacy removed)
3542    // This method handled the relay of coordination messages between peers
3543    // to facilitate synchronized hole punching.
3544
3545    // Implement round-based synchronization protocol (legacy removed)
3546    // This managed the timing and synchronization of hole punching rounds
3547    // to maximize the chances of successful NAT traversal.
3548
3549    // Get coordination session by ID (legacy removed)
3550
3551    // Get mutable coordination session by ID (legacy removed)
3552
3553    // Mark coordination session as successful (legacy removed)
3554
3555    // Mark coordination session as failed (legacy removed)
3556
3557    #[allow(dead_code)]
3558    pub(crate) fn get_peer_record(&self, _peer_id: PeerId) -> Option<&PeerObservationRecord> {
3559        // Legacy API kept for callers; we no longer maintain full records
3560        None
3561    }
3562}
3563
3564// Multi-destination packet transmission manager for NAT traversal
3565//
3566// This component handles simultaneous packet transmission to multiple candidate
3567// addresses during hole punching attempts, maximizing the chances of successful
3568// NAT traversal by sending packets to all viable destinations concurrently.
3569// TODO: Implement multi-path transmission infrastructure when needed
3570// This would include MultiDestinationTransmitter for sending packets to multiple
3571// destinations simultaneously for improved NAT traversal success rates.
3572// TODO: Fix nat_traversal_tests module imports
3573// #[cfg(test)]
3574// #[path = "nat_traversal_tests.rs"]
3575// mod tests;
3576
3577#[cfg(test)]
3578mod tests {
3579    use super::*;
3580
3581    // v0.13.0: Role parameter removed - all nodes are symmetric P2P nodes
3582    fn create_test_state() -> NatTraversalState {
3583        NatTraversalState::new(
3584            10,                      // max_candidates
3585            Duration::from_secs(30), // coordination_timeout
3586        )
3587    }
3588
3589    #[test]
3590    fn test_add_quic_discovered_address() {
3591        // Test that QUIC-discovered addresses are properly added as local candidates
3592        let mut state = create_test_state();
3593        let now = Instant::now();
3594
3595        // Add a QUIC-discovered address (using add_local_candidate with Observed source)
3596        let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
3597        let seq = state.add_local_candidate(
3598            discovered_addr,
3599            CandidateSource::Observed { by_node: None },
3600            now,
3601        );
3602
3603        // Verify it was added correctly
3604        assert_eq!(state.local_candidates.len(), 1);
3605        let candidate = state.local_candidates.get(&seq).unwrap();
3606        assert_eq!(candidate.address, discovered_addr);
3607        assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3608        assert_eq!(candidate.state, CandidateState::New);
3609
3610        // Verify priority is set appropriately for server-reflexive
3611        assert!(candidate.priority > 0);
3612    }
3613
3614    #[test]
3615    fn test_add_multiple_quic_discovered_addresses() {
3616        // Test adding multiple QUIC-discovered addresses
3617        let mut state = create_test_state();
3618        let now = Instant::now();
3619
3620        let addrs = vec![
3621            SocketAddr::from(([1, 2, 3, 4], 5678)),
3622            SocketAddr::from(([5, 6, 7, 8], 9012)),
3623            SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
3624        ];
3625
3626        let mut sequences = Vec::new();
3627        for addr in &addrs {
3628            let seq =
3629                state.add_local_candidate(*addr, CandidateSource::Observed { by_node: None }, now);
3630            sequences.push(seq);
3631        }
3632
3633        // Verify all were added
3634        assert_eq!(state.local_candidates.len(), 3);
3635
3636        // Verify each address
3637        for (seq, addr) in sequences.iter().zip(&addrs) {
3638            let candidate = state.local_candidates.get(seq).unwrap();
3639            assert_eq!(candidate.address, *addr);
3640            assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3641        }
3642    }
3643
3644    #[test]
3645    fn test_quic_discovered_addresses_in_local_candidates() {
3646        // Test that QUIC-discovered addresses are included in local candidates
3647        let mut state = create_test_state();
3648        let now = Instant::now();
3649
3650        // Add a discovered address
3651        let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3652        let seq = state.add_local_candidate(addr, CandidateSource::Observed { by_node: None }, now);
3653
3654        // Verify it's in local candidates for advertisement
3655        assert!(state.local_candidates.contains_key(&seq));
3656        let candidate = state.local_candidates.get(&seq).unwrap();
3657        assert_eq!(candidate.address, addr);
3658
3659        // Verify it has appropriate priority for server-reflexive
3660        assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
3661    }
3662
3663    #[test]
3664    fn test_quic_discovered_addresses_included_in_hole_punching() {
3665        // Test that QUIC-discovered addresses are used in hole punching
3666        let mut state = create_test_state();
3667        let now = Instant::now();
3668
3669        // Add a local discovered address
3670        let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
3671        state.add_local_candidate(local_addr, CandidateSource::Observed { by_node: None }, now);
3672
3673        // Add a remote candidate (using valid public IP, not documentation range)
3674        let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
3675        let priority = VarInt::from_u32(100);
3676        state
3677            .add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
3678            .expect("add remote candidate should succeed");
3679
3680        // Generate candidate pairs
3681        state.generate_candidate_pairs(now);
3682
3683        // Should have one pair
3684        assert_eq!(state.candidate_pairs.len(), 1);
3685        let pair = &state.candidate_pairs[0];
3686        assert_eq!(pair.local_addr, local_addr);
3687        assert_eq!(pair.remote_addr, remote_addr);
3688    }
3689
3690    #[test]
3691    fn test_prioritize_quic_discovered_over_predicted() {
3692        // Test that QUIC-discovered addresses have higher priority than predicted
3693        let mut state = create_test_state();
3694        let now = Instant::now();
3695
3696        // Add a predicted address
3697        let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
3698        let predicted_seq =
3699            state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
3700
3701        // Add a QUIC-discovered address
3702        let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
3703        let discovered_seq = state.add_local_candidate(
3704            discovered_addr,
3705            CandidateSource::Observed { by_node: None },
3706            now,
3707        );
3708
3709        // Compare priorities
3710        let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
3711        let discovered_priority = state
3712            .local_candidates
3713            .get(&discovered_seq)
3714            .unwrap()
3715            .priority;
3716
3717        // QUIC-discovered (server-reflexive) should have higher priority than predicted
3718        // Both are server-reflexive type, but observed addresses should get higher local preference
3719        assert!(discovered_priority >= predicted_priority);
3720    }
3721
3722    #[test]
3723    fn test_integration_with_nat_traversal_flow() {
3724        // Test full integration with NAT traversal flow
3725        let mut state = create_test_state();
3726        let now = Instant::now();
3727
3728        // Add both local interface and QUIC-discovered addresses
3729        let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
3730        state.add_local_candidate(local_addr, CandidateSource::Local, now);
3731
3732        let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
3733        state.add_local_candidate(
3734            discovered_addr,
3735            CandidateSource::Observed { by_node: None },
3736            now,
3737        );
3738
3739        // Add remote candidates (using valid public IPs)
3740        let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
3741        let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
3742        let priority = VarInt::from_u32(100);
3743        state
3744            .add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
3745            .expect("add remote candidate should succeed");
3746        state
3747            .add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
3748            .expect("add remote candidate should succeed");
3749
3750        // Generate candidate pairs
3751        state.generate_candidate_pairs(now);
3752
3753        // Should have 4 pairs (2 local × 2 remote)
3754        assert_eq!(state.candidate_pairs.len(), 4);
3755
3756        // Verify QUIC-discovered addresses are included
3757        let discovered_pairs: Vec<_> = state
3758            .candidate_pairs
3759            .iter()
3760            .filter(|p| p.local_addr == discovered_addr)
3761            .collect();
3762        assert_eq!(discovered_pairs.len(), 2);
3763    }
3764}