1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, error, info, warn};
17
18use crate::Connection;
19
20use crate::{
21 connection::nat_traversal::{CandidateSource, CandidateState},
22 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
23};
24
25#[cfg(all(target_os = "windows", feature = "network-discovery"))]
27pub mod windows;
28
29#[cfg(all(target_os = "windows", feature = "network-discovery"))]
30pub use windows::WindowsInterfaceDiscovery;
31
32#[cfg(all(target_os = "linux", feature = "network-discovery"))]
33pub mod linux;
34
35#[cfg(all(target_os = "linux", feature = "network-discovery"))]
36pub use linux::LinuxInterfaceDiscovery;
37
38#[cfg(all(target_os = "macos", feature = "network-discovery"))]
39pub(crate) mod macos;
40
41#[cfg(all(target_os = "macos", feature = "network-discovery"))]
42pub(crate) use macos::MacOSInterfaceDiscovery;
43
44fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
46 match discovery_source {
47 DiscoverySourceType::Local => CandidateSource::Local,
48 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
49 DiscoverySourceType::Predicted => CandidateSource::Predicted,
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum DiscoverySourceType {
56 Local,
57 ServerReflexive,
58 Predicted,
59}
60
61#[derive(Debug, Clone)]
63pub(crate) struct DiscoveryCandidate {
64 pub address: SocketAddr,
65 pub priority: u32,
66 pub source: DiscoverySourceType,
67 pub state: CandidateState,
68}
69
70impl DiscoveryCandidate {
71 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
73 CandidateAddress {
74 address: self.address,
75 priority: self.priority,
76 source: convert_to_nat_source(self.source),
77 state: self.state,
78 }
79 }
80}
81
82#[derive(Debug)]
84
85pub struct DiscoverySession {
86 peer_id: PeerId,
88 session_id: u64,
90 current_phase: DiscoveryPhase,
92 started_at: Instant,
94 discovered_candidates: Vec<DiscoveryCandidate>,
96 statistics: DiscoveryStatistics,
98 allocation_history: VecDeque<PortAllocationEvent>,
100 server_reflexive_discovery: ServerReflexiveDiscovery,
102}
103
104pub struct CandidateDiscoveryManager {
106 config: DiscoveryConfig,
108 interface_discovery: Arc<std::sync::Mutex<Box<dyn NetworkInterfaceDiscovery + Send>>>,
110 symmetric_predictor: Arc<std::sync::Mutex<SymmetricNatPredictor>>,
112 bootstrap_manager: Arc<BootstrapNodeManager>,
114 cache: DiscoveryCache,
116 active_sessions: HashMap<PeerId, DiscoverySession>,
118 cached_local_candidates: Option<(Instant, Vec<ValidatedCandidate>)>,
120 local_cache_duration: Duration,
122 pending_validations: HashMap<CandidateId, PendingValidation>,
124}
125
126#[derive(Debug, Clone)]
128pub struct DiscoveryConfig {
129 pub total_timeout: Duration,
131 pub local_scan_timeout: Duration,
133 pub bootstrap_query_timeout: Duration,
135 pub max_query_retries: u32,
137 pub max_candidates: usize,
139 pub enable_symmetric_prediction: bool,
141 pub min_bootstrap_consensus: usize,
143 pub interface_cache_ttl: Duration,
145 pub server_reflexive_cache_ttl: Duration,
147 pub bound_address: Option<SocketAddr>,
149}
150
151#[derive(Debug, Clone, PartialEq)]
153pub enum DiscoveryPhase {
154 Idle,
156 LocalInterfaceScanning { started_at: Instant },
158 ServerReflexiveQuerying {
160 started_at: Instant,
161 active_queries: HashMap<BootstrapNodeId, QueryState>,
162 responses_received: Vec<ServerReflexiveResponse>,
163 },
164 SymmetricNatPrediction {
166 started_at: Instant,
167 prediction_attempts: u32,
168 pattern_analysis: PatternAnalysisState,
169 },
170 CandidateValidation {
172 started_at: Instant,
173 validation_results: HashMap<CandidateId, ValidationResult>,
174 },
175 Completed {
177 final_candidates: Vec<ValidatedCandidate>,
178 completion_time: Instant,
179 },
180 Failed {
182 error: DiscoveryError,
184 failed_at: Instant,
186 fallback_options: Vec<FallbackStrategy>,
188 },
189}
190
191#[derive(Debug, Clone)]
193pub enum DiscoveryEvent {
194 DiscoveryStarted {
196 peer_id: PeerId,
197 bootstrap_count: usize,
198 },
199 LocalScanningStarted,
201 LocalCandidateDiscovered { candidate: CandidateAddress },
203 LocalScanningCompleted {
205 candidate_count: usize,
206 duration: Duration,
207 },
208 ServerReflexiveDiscoveryStarted { bootstrap_count: usize },
210 ServerReflexiveCandidateDiscovered {
212 candidate: CandidateAddress,
213 bootstrap_node: SocketAddr,
214 },
215 BootstrapQueryFailed {
217 bootstrap_node: SocketAddr,
219 error: String,
221 },
222 SymmetricPredictionStarted { base_address: SocketAddr },
224 PredictedCandidateGenerated {
226 candidate: CandidateAddress,
227 confidence: f64,
228 },
229 PortAllocationDetected {
231 port: u16,
232 source_address: SocketAddr,
233 bootstrap_node: BootstrapNodeId,
234 timestamp: Instant,
235 },
236 DiscoveryCompleted {
238 candidate_count: usize,
239 total_duration: Duration,
240 success_rate: f64,
241 },
242 DiscoveryFailed {
244 error: DiscoveryError,
246 partial_results: Vec<CandidateAddress>,
248 },
249 PathValidationRequested {
251 candidate_id: CandidateId,
252 candidate_address: SocketAddr,
253 challenge_token: u64,
254 },
255 PathValidationResponse {
257 candidate_id: CandidateId,
258 candidate_address: SocketAddr,
259 challenge_token: u64,
260 rtt: Duration,
261 },
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
266pub struct BootstrapNodeId(pub u64);
267
268struct PendingValidation {
270 candidate_address: SocketAddr,
272 challenge_token: u64,
274 started_at: Instant,
276 attempts: u32,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
282pub enum QueryState {
283 Pending { sent_at: Instant, attempts: u32 },
285 Completed,
287 Failed,
289}
290
291#[derive(Debug, Clone, PartialEq)]
293pub struct ServerReflexiveResponse {
294 pub bootstrap_node: BootstrapNodeId,
295 pub observed_address: SocketAddr,
296 pub response_time: Duration,
297 pub timestamp: Instant,
298}
299
300#[derive(Debug, Clone, PartialEq)]
302pub struct PatternAnalysisState {
303 pub allocation_history: VecDeque<PortAllocationEvent>,
304 pub detected_pattern: Option<PortAllocationPattern>,
305 pub confidence_level: f64,
306 pub prediction_accuracy: f64,
307}
308
309#[derive(Debug, Clone, PartialEq)]
311pub struct PortAllocationEvent {
312 pub port: u16,
313 pub timestamp: Instant,
314 pub source_address: SocketAddr,
315}
316
317#[derive(Debug, Clone, PartialEq)]
319pub struct PortAllocationPattern {
320 pub pattern_type: AllocationPatternType,
321 pub base_port: u16,
322 pub stride: u16,
323 pub pool_boundaries: Option<(u16, u16)>,
324 pub confidence: f64,
325}
326
327#[derive(Debug, Clone, PartialEq, Eq)]
329pub enum AllocationPatternType {
330 Sequential,
332 FixedStride,
334 Random,
336 PoolBased,
338 TimeBased,
340 Unknown,
342}
343
344#[derive(Debug, Clone)]
346pub struct PortPatternAnalysis {
347 pub pattern: PortAllocationPattern,
349 pub increment: Option<i32>,
351 pub base_port: u16,
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
357pub struct CandidateId(pub u64);
358
359#[derive(Debug, Clone, PartialEq)]
361pub enum ValidationResult {
362 Valid { rtt: Duration },
363 Invalid { reason: String },
364 Timeout,
365 Pending,
366}
367
368#[derive(Debug, Clone, PartialEq)]
370pub struct ValidatedCandidate {
371 pub id: CandidateId,
372 pub address: SocketAddr,
373 pub source: DiscoverySourceType,
374 pub priority: u32,
375 pub rtt: Option<Duration>,
376 pub reliability_score: f64,
377}
378
379impl ValidatedCandidate {
380 pub fn to_candidate_address(&self) -> CandidateAddress {
382 CandidateAddress {
383 address: self.address,
384 priority: self.priority,
385 source: convert_to_nat_source(self.source),
386 state: CandidateState::Valid,
387 }
388 }
389}
390
391#[derive(Debug)]
393
394pub(crate) struct DiscoverySessionState {
395 pub peer_id: PeerId,
396 pub session_id: u64,
397 pub started_at: Instant,
398 pub discovered_candidates: Vec<DiscoveryCandidate>,
399 pub statistics: DiscoveryStatistics,
400 pub allocation_history: VecDeque<PortAllocationEvent>,
401}
402
403#[derive(Debug, Default, Clone)]
405pub struct DiscoveryStatistics {
406 pub local_candidates_found: u32,
407 pub server_reflexive_candidates_found: u32,
408 pub predicted_candidates_generated: u32,
409 pub bootstrap_queries_sent: u32,
410 pub bootstrap_queries_successful: u32,
411 pub total_discovery_time: Option<Duration>,
412 pub average_bootstrap_rtt: Option<Duration>,
413 pub invalid_addresses_rejected: u32,
414}
415
416#[derive(Debug, Clone, PartialEq, Eq)]
418pub enum DiscoveryError {
419 NoLocalInterfaces,
421 AllBootstrapsFailed,
423 DiscoveryTimeout,
425 InsufficientCandidates { found: usize, required: usize },
427 NetworkError(String),
429 ConfigurationError(String),
431 InternalError(String),
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
437pub enum FallbackStrategy {
438 UseCachedResults,
440 RetryWithRelaxedParams,
442 UseMinimalCandidates,
444 EnableRelayFallback,
446}
447
448impl Default for DiscoveryConfig {
449 fn default() -> Self {
450 Self {
451 total_timeout: Duration::from_secs(30),
452 local_scan_timeout: Duration::from_secs(2),
453 bootstrap_query_timeout: Duration::from_secs(5),
454 max_query_retries: 3,
455 max_candidates: 8,
456 enable_symmetric_prediction: true,
457 min_bootstrap_consensus: 2,
458 interface_cache_ttl: Duration::from_secs(60),
459 server_reflexive_cache_ttl: Duration::from_secs(300),
460 bound_address: None,
461 }
462 }
463}
464
465impl DiscoverySession {
466 fn new(peer_id: PeerId, config: &DiscoveryConfig) -> Self {
468 Self {
469 peer_id,
470 session_id: rand::random(),
471 current_phase: DiscoveryPhase::Idle,
472 started_at: Instant::now(),
473 discovered_candidates: Vec::new(),
474 statistics: DiscoveryStatistics::default(),
475 allocation_history: VecDeque::new(),
476 server_reflexive_discovery: ServerReflexiveDiscovery::new(config),
477 }
478 }
479}
480
481impl CandidateDiscoveryManager {
482 pub fn new(config: DiscoveryConfig) -> Self {
484 let interface_discovery =
485 Arc::new(std::sync::Mutex::new(create_platform_interface_discovery()));
486 let symmetric_predictor =
487 Arc::new(std::sync::Mutex::new(SymmetricNatPredictor::new(&config)));
488 let bootstrap_manager = Arc::new(BootstrapNodeManager::new(&config));
489 let cache = DiscoveryCache::new(&config);
490 let local_cache_duration = config.interface_cache_ttl;
491
492 Self {
493 config,
494 interface_discovery,
495 symmetric_predictor,
496 bootstrap_manager,
497 cache,
498 active_sessions: HashMap::new(),
499 cached_local_candidates: None,
500 local_cache_duration,
501 pending_validations: HashMap::new(),
502 }
503 }
504
505 pub fn set_bound_address(&mut self, address: SocketAddr) {
507 self.config.bound_address = Some(address);
508 self.cached_local_candidates = None;
510 }
511
512 pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
514 self.interface_discovery
516 .lock()
517 .unwrap()
518 .start_scan()
519 .map_err(|e| {
520 DiscoveryError::NetworkError(format!("Failed to start interface scan: {e}"))
521 })?;
522
523 let start = Instant::now();
525 let timeout = Duration::from_secs(2);
526
527 loop {
528 if start.elapsed() > timeout {
529 return Err(DiscoveryError::DiscoveryTimeout);
530 }
531
532 if let Some(interfaces) = self
533 .interface_discovery
534 .lock()
535 .unwrap()
536 .check_scan_complete()
537 {
538 let mut candidates = Vec::new();
540
541 for interface in interfaces {
542 for addr in interface.addresses {
543 candidates.push(ValidatedCandidate {
544 id: CandidateId(rand::random()),
545 address: addr,
546 source: DiscoverySourceType::Local,
547 priority: 50000, rtt: None,
549 reliability_score: 1.0,
550 });
551 }
552 }
553
554 if candidates.is_empty() {
555 return Err(DiscoveryError::NoLocalInterfaces);
556 }
557
558 return Ok(candidates);
559 }
560
561 std::thread::sleep(Duration::from_millis(10));
563 }
564 }
565
566 pub fn start_discovery(
568 &mut self,
569 peer_id: PeerId,
570 _bootstrap_nodes: Vec<BootstrapNode>,
571 ) -> Result<(), DiscoveryError> {
572 if self.active_sessions.contains_key(&peer_id) {
574 return Err(DiscoveryError::InternalError(format!(
575 "Discovery already in progress for peer {peer_id:?}"
576 )));
577 }
578
579 info!("Starting candidate discovery for peer {:?}", peer_id);
580
581 let mut session = DiscoverySession::new(peer_id, &self.config);
583
584 session.current_phase = DiscoveryPhase::LocalInterfaceScanning {
589 started_at: Instant::now(),
590 };
591
592 self.active_sessions.insert(peer_id, session);
594
595 Ok(())
596 }
597
598 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
600 let mut all_events = Vec::new();
601 let mut completed_sessions = Vec::new();
602
603 let mut local_scan_events = Vec::new();
606 for (peer_id, session) in &mut self.active_sessions {
607 if let DiscoveryPhase::LocalInterfaceScanning { started_at } = &session.current_phase {
608 if started_at.elapsed() > self.config.local_scan_timeout {
610 local_scan_events.push((
611 *peer_id,
612 DiscoveryEvent::LocalScanningCompleted {
613 candidate_count: 0,
614 duration: started_at.elapsed(),
615 },
616 ));
617 }
618 }
619 }
620
621 for (peer_id, event) in local_scan_events {
623 all_events.push(event);
624 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
625 session.current_phase = DiscoveryPhase::Completed {
627 final_candidates: session
628 .discovered_candidates
629 .iter()
630 .map(|dc| ValidatedCandidate {
631 id: CandidateId(0),
632 address: dc.address,
633 source: dc.source,
634 priority: dc.priority,
635 rtt: None,
636 reliability_score: 1.0,
637 })
638 .collect(),
639 completion_time: now,
640 };
641
642 all_events.push(DiscoveryEvent::DiscoveryCompleted {
643 candidate_count: session.discovered_candidates.len(),
644 total_duration: now.duration_since(session.started_at),
645 success_rate: 1.0,
646 });
647
648 completed_sessions.push(peer_id);
649 }
650 }
651
652 for peer_id in completed_sessions {
654 self.active_sessions.remove(&peer_id);
655 debug!("Removed completed discovery session for peer {:?}", peer_id);
656 }
657
658 all_events
659 }
660
661 pub fn get_status(&self) -> DiscoveryStatus {
663 DiscoveryStatus {
665 phase: DiscoveryPhase::Idle,
666 discovered_candidates: Vec::new(),
667 statistics: DiscoveryStatistics::default(),
668 elapsed_time: Duration::from_secs(0),
669 }
670 }
671
672 pub fn is_complete(&self) -> bool {
674 self.active_sessions.values().all(|session| {
676 matches!(
677 session.current_phase,
678 DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. }
679 )
680 })
681 }
682
683 pub fn get_results(&self) -> Option<DiscoveryResults> {
685 if self.active_sessions.is_empty() {
687 return None;
688 }
689
690 let mut all_candidates = Vec::new();
692 let mut latest_completion = Instant::now();
693 let mut combined_stats = DiscoveryStatistics::default();
694
695 for session in self.active_sessions.values() {
696 match &session.current_phase {
697 DiscoveryPhase::Completed {
698 final_candidates,
699 completion_time,
700 } => {
701 all_candidates.extend(final_candidates.clone());
703 latest_completion = *completion_time;
704 combined_stats.local_candidates_found +=
706 session.statistics.local_candidates_found;
707 combined_stats.server_reflexive_candidates_found +=
708 session.statistics.server_reflexive_candidates_found;
709 combined_stats.predicted_candidates_generated +=
710 session.statistics.predicted_candidates_generated;
711 combined_stats.bootstrap_queries_sent +=
712 session.statistics.bootstrap_queries_sent;
713 combined_stats.bootstrap_queries_successful +=
714 session.statistics.bootstrap_queries_successful;
715 }
716 DiscoveryPhase::Failed { .. } => {
717 let validated: Vec<ValidatedCandidate> = session
720 .discovered_candidates
721 .iter()
722 .enumerate()
723 .map(|(idx, dc)| ValidatedCandidate {
724 id: CandidateId(idx as u64),
725 address: dc.address,
726 source: dc.source,
727 priority: dc.priority,
728 rtt: None,
729 reliability_score: 0.5, })
731 .collect();
732 all_candidates.extend(validated);
733 }
734 _ => {}
735 }
736 }
737
738 if all_candidates.is_empty() {
739 None
740 } else {
741 Some(DiscoveryResults {
742 candidates: all_candidates,
743 completion_time: latest_completion,
744 statistics: combined_stats,
745 })
746 }
747 }
748
749 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
751 if let Some(session) = self.active_sessions.get(&peer_id) {
753 session
755 .discovered_candidates
756 .iter()
757 .map(|c| c.to_candidate_address())
758 .collect()
759 } else {
760 debug!("No active discovery session found for peer {:?}", peer_id);
762 Vec::new()
763 }
764 }
765
766 fn poll_session_local_scanning(
769 &mut self,
770 session: &mut DiscoverySession,
771 started_at: Instant,
772 now: Instant,
773 events: &mut Vec<DiscoveryEvent>,
774 ) {
775 if let Some((cache_time, ref cached_candidates)) = self.cached_local_candidates {
777 if cache_time.elapsed() < self.local_cache_duration {
778 debug!(
780 "Using cached local candidates for peer {:?}",
781 session.peer_id
782 );
783 self.process_cached_local_candidates(
784 session,
785 cached_candidates.clone(),
786 events,
787 now,
788 );
789 return;
790 }
791 }
792
793 if started_at.elapsed().as_millis() < 10 {
796 let scan_result = self.interface_discovery.lock().unwrap().start_scan();
797 match scan_result {
798 Ok(()) => {
799 debug!(
800 "Started local interface scan for peer {:?}",
801 session.peer_id
802 );
803 events.push(DiscoveryEvent::LocalScanningStarted);
804 }
805 Err(e) => {
806 error!("Failed to start interface scan: {}", e);
807 self.handle_session_local_scan_timeout(session, events, now);
808 return;
809 }
810 }
811 }
812
813 if started_at.elapsed() > self.config.local_scan_timeout {
815 warn!(
816 "Local interface scanning timeout for peer {:?}",
817 session.peer_id
818 );
819 self.handle_session_local_scan_timeout(session, events, now);
820 return;
821 }
822
823 let scan_complete_result = self
825 .interface_discovery
826 .lock()
827 .unwrap()
828 .check_scan_complete();
829 if let Some(interfaces) = scan_complete_result {
830 self.process_session_local_interfaces(session, interfaces, events, now);
831 }
832 }
833
834 fn process_session_local_interfaces(
835 &mut self,
836 session: &mut DiscoverySession,
837 interfaces: Vec<NetworkInterface>,
838 events: &mut Vec<DiscoveryEvent>,
839 now: Instant,
840 ) {
841 debug!(
842 "Processing {} network interfaces for peer {:?}",
843 interfaces.len(),
844 session.peer_id
845 );
846
847 let mut validated_candidates = Vec::new();
848
849 if let Some(bound_addr) = self.config.bound_address {
851 if self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback() {
852 let candidate = DiscoveryCandidate {
853 address: bound_addr,
854 priority: 60000, source: DiscoverySourceType::Local,
856 state: CandidateState::New,
857 };
858
859 session.discovered_candidates.push(candidate.clone());
860 session.statistics.local_candidates_found += 1;
861
862 validated_candidates.push(ValidatedCandidate {
864 id: CandidateId(rand::random()),
865 address: bound_addr,
866 source: DiscoverySourceType::Local,
867 priority: candidate.priority,
868 rtt: None,
869 reliability_score: 1.0,
870 });
871
872 events.push(DiscoveryEvent::LocalCandidateDiscovered {
873 candidate: candidate.to_candidate_address(),
874 });
875
876 debug!(
877 "Added bound address {} as local candidate for peer {:?}",
878 bound_addr, session.peer_id
879 );
880 }
881 }
882
883 for interface in &interfaces {
885 for address in &interface.addresses {
886 if Some(*address) == self.config.bound_address {
888 continue;
889 }
890
891 if self.is_valid_local_address(address) {
892 let candidate = DiscoveryCandidate {
893 address: *address,
894 priority: self.calculate_local_priority(address, interface),
895 source: DiscoverySourceType::Local,
896 state: CandidateState::New,
897 };
898
899 session.discovered_candidates.push(candidate.clone());
900 session.statistics.local_candidates_found += 1;
901
902 validated_candidates.push(ValidatedCandidate {
904 id: CandidateId(rand::random()),
905 address: *address,
906 source: DiscoverySourceType::Local,
907 priority: candidate.priority,
908 rtt: None,
909 reliability_score: 1.0,
910 });
911
912 events.push(DiscoveryEvent::LocalCandidateDiscovered {
913 candidate: candidate.to_candidate_address(),
914 });
915 }
916 }
917 }
918
919 self.cached_local_candidates = Some((now, validated_candidates));
921
922 events.push(DiscoveryEvent::LocalScanningCompleted {
923 candidate_count: session.statistics.local_candidates_found as usize,
924 duration: now.duration_since(session.started_at),
925 });
926
927 self.start_session_server_reflexive_discovery(session, events, now);
929 }
930
931 fn process_cached_local_candidates(
932 &mut self,
933 session: &mut DiscoverySession,
934 mut cached_candidates: Vec<ValidatedCandidate>,
935 events: &mut Vec<DiscoveryEvent>,
936 now: Instant,
937 ) {
938 if let Some(bound_addr) = self.config.bound_address {
940 let has_bound_addr = cached_candidates.iter().any(|c| c.address == bound_addr);
941 if !has_bound_addr
942 && (self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback())
943 {
944 cached_candidates.insert(
945 0,
946 ValidatedCandidate {
947 id: CandidateId(rand::random()),
948 address: bound_addr,
949 source: DiscoverySourceType::Local,
950 priority: 60000, rtt: None,
952 reliability_score: 1.0,
953 },
954 );
955 }
956 }
957
958 debug!(
959 "Using {} cached local candidates for peer {:?}",
960 cached_candidates.len(),
961 session.peer_id
962 );
963
964 for validated in cached_candidates {
965 let candidate = DiscoveryCandidate {
966 address: validated.address,
967 priority: validated.priority,
968 source: validated.source,
969 state: CandidateState::New,
970 };
971
972 session.discovered_candidates.push(candidate.clone());
973 session.statistics.local_candidates_found += 1;
974
975 events.push(DiscoveryEvent::LocalCandidateDiscovered {
976 candidate: candidate.to_candidate_address(),
977 });
978 }
979
980 events.push(DiscoveryEvent::LocalScanningCompleted {
981 candidate_count: session.statistics.local_candidates_found as usize,
982 duration: now.duration_since(session.started_at),
983 });
984
985 self.start_session_server_reflexive_discovery(session, events, now);
987 }
988
989 fn start_session_server_reflexive_discovery(
990 &mut self,
991 session: &mut DiscoverySession,
992 events: &mut Vec<DiscoveryEvent>,
993 now: Instant,
994 ) {
995 let has_quic_discovered = session
997 .discovered_candidates
998 .iter()
999 .any(|c| c.source == DiscoverySourceType::ServerReflexive);
1000
1001 if has_quic_discovered {
1002 info!(
1003 "Skipping server reflexive discovery for peer {:?}, using QUIC-discovered addresses",
1004 session.peer_id
1005 );
1006 self.complete_session_discovery_with_local_candidates(session, events, now);
1008 return;
1009 }
1010
1011 let bootstrap_node_ids = self.bootstrap_manager.get_active_bootstrap_nodes();
1012
1013 if bootstrap_node_ids.is_empty() {
1014 info!(
1015 "No bootstrap nodes available for server reflexive discovery for peer {:?}, completing with local candidates only",
1016 session.peer_id
1017 );
1018 self.complete_session_discovery_with_local_candidates(session, events, now);
1020 return;
1021 }
1022
1023 let bootstrap_nodes_with_addresses: Vec<(BootstrapNodeId, SocketAddr)> = bootstrap_node_ids
1025 .iter()
1026 .filter_map(|&node_id| {
1027 self.bootstrap_manager
1028 .get_bootstrap_address(node_id)
1029 .map(|addr| (node_id, addr))
1030 })
1031 .collect();
1032
1033 if bootstrap_nodes_with_addresses.is_empty() {
1034 warn!("No bootstrap node addresses available for server reflexive discovery");
1035 self.complete_session_discovery_with_local_candidates(session, events, now);
1037 return;
1038 }
1039
1040 let active_queries = session
1042 .server_reflexive_discovery
1043 .start_queries_with_addresses(&bootstrap_nodes_with_addresses, now);
1044
1045 events.push(DiscoveryEvent::ServerReflexiveDiscoveryStarted {
1046 bootstrap_count: bootstrap_nodes_with_addresses.len(),
1047 });
1048
1049 session.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
1050 started_at: now,
1051 active_queries,
1052 responses_received: Vec::new(),
1053 };
1054 }
1055
1056 fn process_server_reflexive_response_for_session(
1057 &mut self,
1058 session: &mut DiscoverySession,
1059 response: &ServerReflexiveResponse,
1060 events: &mut Vec<DiscoveryEvent>,
1061 ) {
1062 debug!("Received server reflexive response: {:?}", response);
1063
1064 if !self.is_valid_server_reflexive_address(&response.observed_address) {
1066 warn!(
1067 "Ignoring invalid server reflexive address {} from bootstrap node",
1068 response.observed_address
1069 );
1070 session.statistics.invalid_addresses_rejected += 1;
1071 return;
1072 }
1073
1074 if response.response_time > Duration::from_secs(10) {
1076 warn!(
1077 "Ignoring server reflexive response with excessive delay: {:?}",
1078 response.response_time
1079 );
1080 return;
1081 }
1082
1083 let allocation_event = PortAllocationEvent {
1085 port: response.observed_address.port(),
1086 timestamp: response.timestamp,
1087 source_address: response.observed_address,
1088 };
1089
1090 if let DiscoveryPhase::ServerReflexiveQuerying { .. } = &mut session.current_phase {
1092 session
1095 .allocation_history
1096 .push_back(allocation_event.clone());
1097
1098 if session.allocation_history.len() > 20 {
1100 session.allocation_history.pop_front();
1101 }
1102 }
1103
1104 let candidate = DiscoveryCandidate {
1105 address: response.observed_address,
1106 priority: self.calculate_server_reflexive_priority(response),
1107 source: DiscoverySourceType::ServerReflexive,
1108 state: CandidateState::New,
1109 };
1110
1111 session.discovered_candidates.push(candidate.clone());
1112 session.statistics.server_reflexive_candidates_found += 1;
1113
1114 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1115 candidate: candidate.to_candidate_address(),
1116 bootstrap_node: self
1117 .bootstrap_manager
1118 .get_bootstrap_address(response.bootstrap_node)
1119 .unwrap_or_else(|| "unknown".parse().unwrap()),
1120 });
1121
1122 events.push(DiscoveryEvent::PortAllocationDetected {
1123 port: allocation_event.port,
1124 source_address: allocation_event.source_address,
1125 bootstrap_node: response.bootstrap_node,
1126 timestamp: allocation_event.timestamp,
1127 });
1128 }
1129
1130 fn start_session_symmetric_prediction(
1131 &mut self,
1132 session: &mut DiscoverySession,
1133 responses: &[ServerReflexiveResponse],
1134 events: &mut Vec<DiscoveryEvent>,
1135 now: Instant,
1136 ) {
1137 if !self.config.enable_symmetric_prediction || responses.is_empty() {
1138 self.complete_session_discovery_with_local_candidates(session, events, now);
1140 return;
1141 }
1142
1143 let base_address = self.calculate_consensus_address(responses);
1145
1146 events.push(DiscoveryEvent::SymmetricPredictionStarted { base_address });
1147
1148 let detected_pattern = self
1150 .symmetric_predictor
1151 .lock()
1152 .unwrap()
1153 .analyze_allocation_patterns(&session.allocation_history);
1154
1155 let confidence_level = detected_pattern
1156 .as_ref()
1157 .map(|p| p.confidence)
1158 .unwrap_or(0.0);
1159
1160 let prediction_accuracy = if let Some(ref pattern) = detected_pattern {
1162 self.calculate_prediction_accuracy(pattern, &session.allocation_history)
1163 } else {
1164 0.3 };
1166
1167 debug!(
1168 "Symmetric NAT pattern analysis: detected_pattern={:?}, confidence={:.2}, accuracy={:.2}",
1169 detected_pattern, confidence_level, prediction_accuracy
1170 );
1171
1172 session.current_phase = DiscoveryPhase::SymmetricNatPrediction {
1173 started_at: now,
1174 prediction_attempts: 0,
1175 pattern_analysis: PatternAnalysisState {
1176 allocation_history: session.allocation_history.clone(),
1177 detected_pattern,
1178 confidence_level,
1179 prediction_accuracy,
1180 },
1181 };
1182 }
1183
1184 fn start_session_candidate_validation(
1185 &mut self,
1186 session: &mut DiscoverySession,
1187 _events: &mut Vec<DiscoveryEvent>,
1188 now: Instant,
1189 ) {
1190 debug!(
1191 "Starting candidate validation for {} candidates",
1192 session.discovered_candidates.len()
1193 );
1194
1195 session.current_phase = DiscoveryPhase::CandidateValidation {
1196 started_at: now,
1197 validation_results: HashMap::new(),
1198 };
1199 }
1200
1201 fn start_path_validation(
1203 &mut self,
1204 candidate_id: CandidateId,
1205 candidate_address: SocketAddr,
1206 now: Instant,
1207 events: &mut Vec<DiscoveryEvent>,
1208 ) {
1209 debug!(
1210 "Starting QUIC path validation for candidate {} at {}",
1211 candidate_id.0, candidate_address
1212 );
1213
1214 let challenge_token: u64 = rand::random();
1216
1217 self.pending_validations.insert(
1219 candidate_id,
1220 PendingValidation {
1221 candidate_address,
1222 challenge_token,
1223 started_at: now,
1224 attempts: 1,
1225 },
1226 );
1227
1228 events.push(DiscoveryEvent::PathValidationRequested {
1230 candidate_id,
1231 candidate_address,
1232 challenge_token,
1233 });
1234
1235 debug!(
1236 "PATH_CHALLENGE {:08x} requested for candidate {} at {}",
1237 challenge_token, candidate_id.0, candidate_address
1238 );
1239 }
1240
1241 pub fn handle_path_response(
1243 &mut self,
1244 candidate_address: SocketAddr,
1245 challenge_token: u64,
1246 now: Instant,
1247 ) -> Option<DiscoveryEvent> {
1248 let candidate_id = self
1250 .pending_validations
1251 .iter()
1252 .find(|(_, validation)| {
1253 validation.candidate_address == candidate_address
1254 && validation.challenge_token == challenge_token
1255 })
1256 .map(|(id, _)| *id)?;
1257
1258 let validation = self.pending_validations.remove(&candidate_id)?;
1260 let rtt = now.duration_since(validation.started_at);
1261
1262 debug!(
1263 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1264 candidate_id.0, candidate_address, rtt
1265 );
1266
1267 for session in self.active_sessions.values_mut() {
1269 if let Some(candidate) = session
1270 .discovered_candidates
1271 .iter_mut()
1272 .find(|c| c.address == candidate_address)
1273 {
1274 candidate.state = CandidateState::Valid;
1275 break;
1277 }
1278 }
1279
1280 Some(DiscoveryEvent::PathValidationResponse {
1281 candidate_id,
1282 candidate_address,
1283 challenge_token,
1284 rtt,
1285 })
1286 }
1287
1288 fn simulate_path_validation(
1290 &mut self,
1291 candidate_id: CandidateId,
1292 candidate_address: SocketAddr,
1293 _now: Instant,
1294 ) {
1295 let is_local = candidate_address.ip().is_loopback()
1297 || (candidate_address.ip().is_ipv4()
1298 && candidate_address.ip().to_string().starts_with("192.168."))
1299 || (candidate_address.ip().is_ipv4()
1300 && candidate_address.ip().to_string().starts_with("10."))
1301 || (candidate_address.ip().is_ipv4()
1302 && candidate_address.ip().to_string().starts_with("172."));
1303
1304 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
1305
1306 debug!(
1309 "Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
1310 candidate_id.0, candidate_address, is_local, is_server_reflexive
1311 );
1312 }
1313
1314 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
1316 let is_local = address.ip().is_loopback()
1317 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168."))
1318 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("10."))
1319 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
1320
1321 if is_local {
1322 ValidationResult::Valid {
1324 rtt: Duration::from_millis(1),
1325 }
1326 } else if address.ip().is_unspecified() {
1327 ValidationResult::Invalid {
1329 reason: "Unspecified address".to_string(),
1330 }
1331 } else {
1332 ValidationResult::Valid {
1334 rtt: Duration::from_millis(50 + (address.port() % 100) as u64),
1335 }
1336 }
1337 }
1338
1339 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1341 let mut score: f64 = 0.5; match candidate.source {
1345 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
1349
1350 let rtt_ms = rtt.as_millis() as f64;
1352 if rtt_ms < 10.0 {
1353 score += 0.2;
1354 } else if rtt_ms < 50.0 {
1355 score += 0.1;
1356 } else if rtt_ms > 200.0 {
1357 score -= 0.1;
1358 }
1359
1360 if candidate.address.ip().is_ipv6() {
1362 score += 0.05; }
1364
1365 score.max(0.0).min(1.0)
1367 }
1368
1369 fn handle_session_timeout(
1372 &mut self,
1373 session: &mut DiscoverySession,
1374 events: &mut Vec<DiscoveryEvent>,
1375 now: Instant,
1376 ) {
1377 let error = DiscoveryError::DiscoveryTimeout;
1378 let partial_results = session
1379 .discovered_candidates
1380 .iter()
1381 .map(|c| c.to_candidate_address())
1382 .collect();
1383
1384 warn!(
1385 "Discovery failed for peer {:?}: discovery process timed out (found {} partial candidates)",
1386 session.peer_id,
1387 session.discovered_candidates.len()
1388 );
1389 events.push(DiscoveryEvent::DiscoveryFailed {
1390 error: error.clone(),
1391 partial_results,
1392 });
1393
1394 session.current_phase = DiscoveryPhase::Failed {
1395 error,
1396 failed_at: now,
1397 fallback_options: vec![FallbackStrategy::UseCachedResults],
1398 };
1399 }
1400
1401 fn handle_session_local_scan_timeout(
1402 &mut self,
1403 session: &mut DiscoverySession,
1404 events: &mut Vec<DiscoveryEvent>,
1405 now: Instant,
1406 ) {
1407 warn!(
1408 "Local interface scan timeout for peer {:?}, proceeding with available candidates",
1409 session.peer_id
1410 );
1411
1412 events.push(DiscoveryEvent::LocalScanningCompleted {
1413 candidate_count: session.statistics.local_candidates_found as usize,
1414 duration: now.duration_since(session.started_at),
1415 });
1416
1417 self.start_session_server_reflexive_discovery(session, events, now);
1418 }
1419
1420 fn poll_session_server_reflexive(
1421 &mut self,
1422 session: &mut DiscoverySession,
1423 _started_at: Instant,
1424 _active_queries: &HashMap<BootstrapNodeId, QueryState>,
1425 _responses_received: &[(BootstrapNodeId, ServerReflexiveResponse)],
1426 now: Instant,
1427 events: &mut Vec<DiscoveryEvent>,
1428 ) {
1429 let has_quic_discovered = session
1431 .discovered_candidates
1432 .iter()
1433 .any(|c| c.source == DiscoverySourceType::ServerReflexive);
1434
1435 if has_quic_discovered {
1436 self.complete_session_discovery_with_local_candidates(session, events, now);
1438 return;
1439 }
1440
1441 self.complete_session_discovery_with_local_candidates(session, events, now);
1444 }
1445
1446 fn poll_session_symmetric_prediction(
1447 &mut self,
1448 session: &mut DiscoverySession,
1449 _started_at: Instant,
1450 _prediction_attempts: u32,
1451 _pattern_analysis: &PatternAnalysisState,
1452 now: Instant,
1453 events: &mut Vec<DiscoveryEvent>,
1454 ) {
1455 self.complete_session_discovery_with_local_candidates(session, events, now);
1458 }
1459
1460 fn poll_session_candidate_validation(
1461 &mut self,
1462 session: &mut DiscoverySession,
1463 _started_at: Instant,
1464 _validation_results: &HashMap<CandidateId, ValidationResult>,
1465 now: Instant,
1466 events: &mut Vec<DiscoveryEvent>,
1467 ) {
1468 self.complete_session_discovery_with_local_candidates(session, events, now);
1471 }
1472
1473 fn complete_session_discovery_with_local_candidates(
1474 &mut self,
1475 session: &mut DiscoverySession,
1476 events: &mut Vec<DiscoveryEvent>,
1477 now: Instant,
1478 ) {
1479 let duration = now.duration_since(session.started_at);
1481 session.statistics.total_discovery_time = Some(duration);
1482
1483 let success_rate = if session.statistics.local_candidates_found > 0 {
1484 1.0
1485 } else {
1486 0.0
1487 };
1488
1489 let validated_candidates: Vec<ValidatedCandidate> = session
1491 .discovered_candidates
1492 .iter()
1493 .map(|dc| ValidatedCandidate {
1494 id: CandidateId(rand::random()),
1495 address: dc.address,
1496 source: dc.source,
1497 priority: dc.priority,
1498 rtt: None,
1499 reliability_score: 1.0,
1500 })
1501 .collect();
1502
1503 events.push(DiscoveryEvent::DiscoveryCompleted {
1504 candidate_count: validated_candidates.len(),
1505 total_duration: duration,
1506 success_rate,
1507 });
1508
1509 session.current_phase = DiscoveryPhase::Completed {
1510 final_candidates: validated_candidates,
1511 completion_time: now,
1512 };
1513
1514 info!(
1515 "Discovery completed with {} local candidates for peer {:?}",
1516 session.discovered_candidates.len(),
1517 session.peer_id
1518 );
1519 }
1520
1521 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1522 use crate::nat_traversal_api::CandidateAddress;
1524
1525 if let Err(e) = CandidateAddress::validate_address(address) {
1526 debug!("Address {} failed validation: {}", address, e);
1527 return false;
1528 }
1529
1530 match address.ip() {
1531 IpAddr::V4(ipv4) => {
1532 #[cfg(test)]
1534 if ipv4.is_loopback() {
1535 return true;
1536 }
1537 !ipv4.is_loopback()
1540 && !ipv4.is_unspecified()
1541 && !ipv4.is_broadcast()
1542 && !ipv4.is_multicast()
1543 && !ipv4.is_documentation()
1544 }
1545 IpAddr::V6(ipv6) => {
1546 #[cfg(test)]
1548 if ipv6.is_loopback() {
1549 return true;
1550 }
1551 let segments = ipv6.segments();
1553 let is_documentation = segments[0] == 0x2001 && segments[1] == 0x0db8;
1554
1555 !ipv6.is_loopback()
1556 && !ipv6.is_unspecified()
1557 && !ipv6.is_multicast()
1558 && !is_documentation
1559 }
1560 }
1561 }
1562
1563 fn is_valid_server_reflexive_address(&self, address: &SocketAddr) -> bool {
1564 use crate::nat_traversal_api::CandidateAddress;
1565
1566 if let Err(e) = CandidateAddress::validate_address(address) {
1568 debug!(
1569 "Server reflexive address {} failed validation: {}",
1570 address, e
1571 );
1572 return false;
1573 }
1574
1575 match address.ip() {
1577 IpAddr::V4(ipv4) => {
1578 !ipv4.is_private()
1581 && !ipv4.is_loopback()
1582 && !ipv4.is_link_local()
1583 && !ipv4.is_documentation()
1584 && !ipv4.is_unspecified()
1585 && !ipv4.is_broadcast()
1586 && !ipv4.is_multicast()
1587 }
1588 IpAddr::V6(ipv6) => {
1589 let segments = ipv6.segments();
1591 let is_global_unicast = (segments[0] & 0xE000) == 0x2000;
1592 let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
1593 let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
1594
1595 is_global_unicast
1597 && !ipv6.is_loopback()
1598 && !ipv6.is_unspecified()
1599 && !ipv6.is_multicast()
1600 && !is_link_local
1601 && !is_unique_local
1602 }
1603 }
1604 }
1605
1606 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1607 let mut priority = 100; match address.ip() {
1610 IpAddr::V4(ipv4) => {
1611 if ipv4.is_private() {
1612 priority += 50; }
1614 }
1615 IpAddr::V6(ipv6) => {
1616 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1619 let segments = ipv6.segments();
1620 if segments[0] & 0xE000 == 0x2000 {
1621 priority += 60;
1623 } else if segments[0] & 0xFFC0 == 0xFE80 {
1624 priority += 20;
1626 } else if segments[0] & 0xFE00 == 0xFC00 {
1627 priority += 40;
1629 } else {
1630 priority += 30;
1632 }
1633 }
1634
1635 priority += 10; }
1638 }
1639
1640 if interface.is_wireless {
1641 priority -= 10; }
1643
1644 priority
1645 }
1646
1647 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1648 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1652 priority += 20;
1653 } else if response.response_time > Duration::from_millis(200) {
1654 priority -= 10;
1655 }
1656
1657 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 {
1659 20
1660 } else {
1661 0
1662 };
1663 priority += age_bonus;
1664
1665 priority
1666 }
1667
1668 fn should_transition_to_prediction(
1669 &self,
1670 responses: &[ServerReflexiveResponse],
1671 _now: Instant,
1672 ) -> bool {
1673 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1674 }
1675
1676 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1677 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1679
1680 for response in responses {
1681 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1682 }
1683
1684 address_counts
1685 .into_iter()
1686 .max_by_key(|(_, count)| *count)
1687 .map(|(addr, _)| addr)
1688 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
1689 }
1690
1691 fn calculate_prediction_accuracy(
1693 &self,
1694 pattern: &PortAllocationPattern,
1695 history: &VecDeque<PortAllocationEvent>,
1696 ) -> f64 {
1697 if history.len() < 3 {
1698 return 0.3; }
1700
1701 let recent_ports: Vec<u16> = history
1703 .iter()
1704 .rev()
1705 .take(10)
1706 .map(|event| event.port)
1707 .collect();
1708
1709 let mut correct_predictions = 0;
1710 let total_predictions = recent_ports.len().saturating_sub(1);
1711
1712 if total_predictions == 0 {
1713 return 0.3;
1714 }
1715
1716 match pattern.pattern_type {
1717 AllocationPatternType::Sequential => {
1718 for i in 1..recent_ports.len() {
1720 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == 1 {
1721 correct_predictions += 1;
1722 }
1723 }
1724 }
1725 AllocationPatternType::FixedStride => {
1726 for i in 1..recent_ports.len() {
1728 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == pattern.stride {
1729 correct_predictions += 1;
1730 }
1731 }
1732 }
1733 AllocationPatternType::PoolBased => {
1734 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1736 for port in &recent_ports {
1737 if *port >= min_port && *port <= max_port {
1738 correct_predictions += 1;
1739 }
1740 }
1741 }
1742 }
1743 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1744 if recent_ports.len() >= 3 {
1746 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>()
1747 / recent_ports.len() as f64;
1748 let variance = recent_ports
1749 .iter()
1750 .map(|&p| (p as f64 - mean).powi(2))
1751 .sum::<f64>()
1752 / recent_ports.len() as f64;
1753
1754 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1758 }
1759 AllocationPatternType::TimeBased => {
1760 if history.len() >= 2 {
1762 let time_diffs: Vec<Duration> = history
1763 .iter()
1764 .collect::<Vec<_>>()
1765 .windows(2)
1766 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1767 .collect();
1768
1769 if !time_diffs.is_empty() {
1770 let avg_diff =
1771 time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1772 let variance = time_diffs
1773 .iter()
1774 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1775 .sum::<f64>()
1776 / time_diffs.len() as f64;
1777
1778 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1782 }
1783 }
1784 }
1785
1786 let accuracy = if total_predictions > 0 {
1788 correct_predictions as f64 / total_predictions as f64
1789 } else {
1790 0.3
1791 };
1792
1793 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1795
1796 confidence_adjusted_accuracy.max(0.2).min(0.9)
1798 }
1799
1800 pub fn accept_quic_discovered_address(
1803 &mut self,
1804 peer_id: PeerId,
1805 discovered_address: SocketAddr,
1806 ) -> Result<(), DiscoveryError> {
1807 let priority = self.calculate_quic_discovered_priority(&discovered_address);
1809
1810 let session = self.active_sessions.get_mut(&peer_id).ok_or_else(|| {
1812 DiscoveryError::InternalError(format!(
1813 "No active discovery session for peer {peer_id:?}"
1814 ))
1815 })?;
1816
1817 let already_exists = session
1819 .discovered_candidates
1820 .iter()
1821 .any(|c| c.address == discovered_address);
1822
1823 if already_exists {
1824 debug!(
1825 "QUIC-discovered address {} already in candidates",
1826 discovered_address
1827 );
1828 return Ok(());
1829 }
1830
1831 info!("Accepting QUIC-discovered address: {}", discovered_address);
1832
1833 let candidate = DiscoveryCandidate {
1835 address: discovered_address,
1836 priority,
1837 source: DiscoverySourceType::ServerReflexive,
1838 state: CandidateState::New,
1839 };
1840
1841 session.discovered_candidates.push(candidate);
1843 session.statistics.server_reflexive_candidates_found += 1;
1844
1845 Ok(())
1846 }
1847
1848 fn calculate_quic_discovered_priority(&self, address: &SocketAddr) -> u32 {
1850 let mut priority = 255; match address.ip() {
1855 IpAddr::V4(ipv4) => {
1856 if ipv4.is_private() {
1857 priority -= 10; } else if ipv4.is_loopback() {
1859 priority -= 20; }
1861 }
1863 IpAddr::V6(ipv6) => {
1864 priority += 10; if ipv6.is_loopback() {
1868 priority -= 30; } else if ipv6.is_multicast() {
1870 priority -= 40; } else if ipv6.is_unspecified() {
1872 priority -= 50; } else {
1874 let segments = ipv6.segments();
1876 if segments[0] & 0xFFC0 == 0xFE80 {
1877 priority -= 30; } else if segments[0] & 0xFE00 == 0xFC00 {
1880 priority -= 10; }
1883 }
1885 }
1886 }
1887
1888 priority
1889 }
1890
1891 pub fn poll_discovery_progress(&mut self, peer_id: PeerId) -> Vec<DiscoveryEvent> {
1893 let mut events = Vec::new();
1894
1895 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
1896 for candidate in &session.discovered_candidates {
1898 if matches!(candidate.state, CandidateState::New) {
1899 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1900 candidate: candidate.to_candidate_address(),
1901 bootstrap_node: "0.0.0.0:0".parse().unwrap(), });
1903 }
1904 }
1905
1906 for candidate in &mut session.discovered_candidates {
1908 if matches!(candidate.state, CandidateState::New) {
1909 candidate.state = CandidateState::Validating;
1910 }
1911 }
1912 }
1913
1914 events
1915 }
1916
1917 pub fn get_discovery_status(&self, peer_id: PeerId) -> Option<DiscoveryStatus> {
1919 self.active_sessions.get(&peer_id).map(|session| {
1920 let discovered_candidates = session
1921 .discovered_candidates
1922 .iter()
1923 .map(|c| c.to_candidate_address())
1924 .collect();
1925
1926 DiscoveryStatus {
1927 phase: session.current_phase.clone(),
1928 discovered_candidates,
1929 statistics: session.statistics.clone(),
1930 elapsed_time: session.started_at.elapsed(),
1931 }
1932 })
1933 }
1934}
1935
1936#[derive(Debug, Clone)]
1938pub struct DiscoveryStatus {
1939 pub phase: DiscoveryPhase,
1940 pub discovered_candidates: Vec<CandidateAddress>,
1941 pub statistics: DiscoveryStatistics,
1942 pub elapsed_time: Duration,
1943}
1944
1945#[derive(Debug, Clone)]
1947pub struct DiscoveryResults {
1948 pub candidates: Vec<ValidatedCandidate>,
1949 pub completion_time: Instant,
1950 pub statistics: DiscoveryStatistics,
1951}
1952
1953pub trait NetworkInterfaceDiscovery {
1957 fn start_scan(&mut self) -> Result<(), String>;
1958 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1959}
1960
1961#[derive(Debug, Clone, PartialEq)]
1963pub struct NetworkInterface {
1964 pub name: String,
1965 pub addresses: Vec<SocketAddr>,
1966 pub is_up: bool,
1967 pub is_wireless: bool,
1968 pub mtu: Option<u16>,
1969}
1970
1971#[derive(Debug)]
1973
1974struct BootstrapConnection {
1975 connection: crate::Connection,
1977 address: SocketAddr,
1979 established_at: Instant,
1981 request_id: u64,
1983}
1984
1985#[derive(Debug, Clone)]
1987
1988struct AddressObservationRequest {
1989 request_id: u64,
1991 timestamp: u64,
1993 capabilities: u32,
1995}
1996
1997#[derive(Debug)]
1999pub(crate) struct ServerReflexiveDiscovery {
2000 config: DiscoveryConfig,
2001 active_queries: HashMap<BootstrapNodeId, QueryState>,
2003 responses: VecDeque<ServerReflexiveResponse>,
2005 query_timeouts: HashMap<BootstrapNodeId, Instant>,
2007 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
2009 runtime_handle: Option<tokio::runtime::Handle>,
2011}
2012
2013impl ServerReflexiveDiscovery {
2014 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2015 Self {
2016 config: config.clone(),
2017 active_queries: HashMap::new(),
2018 responses: VecDeque::new(),
2019 query_timeouts: HashMap::new(),
2020 active_connections: HashMap::new(),
2021 runtime_handle: tokio::runtime::Handle::try_current().ok(),
2022 }
2023 }
2024
2025 pub(crate) fn start_queries(
2026 &mut self,
2027 bootstrap_nodes: &[BootstrapNodeId],
2028 now: Instant,
2029 ) -> HashMap<BootstrapNodeId, QueryState> {
2030 debug!(
2031 "Starting server reflexive queries to {} bootstrap nodes",
2032 bootstrap_nodes.len()
2033 );
2034
2035 self.active_queries.clear();
2036 self.query_timeouts.clear();
2037
2038 self.active_connections.clear();
2039
2040 for &node_id in bootstrap_nodes {
2041 let query_state = QueryState::Pending {
2042 sent_at: now,
2043 attempts: 1,
2044 };
2045
2046 self.active_queries.insert(node_id, query_state);
2047 self.query_timeouts
2048 .insert(node_id, now + self.config.bootstrap_query_timeout);
2049
2050 debug!(
2051 "Starting server reflexive query to bootstrap node {:?}",
2052 node_id
2053 );
2054
2055 if let Some(runtime) = &self.runtime_handle {
2057 self.start_quinn_query(node_id, runtime.clone(), now);
2058 } else {
2059 warn!(
2060 "No async runtime available, falling back to simulation for node {:?}",
2061 node_id
2062 );
2063 self.simulate_bootstrap_response(node_id, now);
2064 }
2065 }
2066
2067 self.active_queries.clone()
2068 }
2069
2070 pub(crate) fn start_queries_with_addresses(
2072 &mut self,
2073 bootstrap_nodes: &[(BootstrapNodeId, SocketAddr)],
2074 now: Instant,
2075 ) -> HashMap<BootstrapNodeId, QueryState> {
2076 debug!(
2077 "Starting server reflexive queries to {} bootstrap nodes with addresses",
2078 bootstrap_nodes.len()
2079 );
2080
2081 self.active_queries.clear();
2082 self.query_timeouts.clear();
2083
2084 self.active_connections.clear();
2085
2086 for &(node_id, bootstrap_address) in bootstrap_nodes {
2087 let query_state = QueryState::Pending {
2088 sent_at: now,
2089 attempts: 1,
2090 };
2091
2092 self.active_queries.insert(node_id, query_state);
2093 self.query_timeouts
2094 .insert(node_id, now + self.config.bootstrap_query_timeout);
2095
2096 debug!(
2097 "Starting server reflexive query to bootstrap node {:?} at {}",
2098 node_id, bootstrap_address
2099 );
2100
2101 if let Some(_runtime) = &self.runtime_handle {
2103 self.start_quinn_query_with_address(node_id, bootstrap_address, now);
2104 } else {
2105 warn!(
2106 "No async runtime available, falling back to simulation for node {:?}",
2107 node_id
2108 );
2109 self.simulate_bootstrap_response(node_id, now);
2110 }
2111 }
2112
2113 self.active_queries.clone()
2114 }
2115
2116 fn start_quinn_query(
2118 &mut self,
2119 node_id: BootstrapNodeId,
2120 _runtime: tokio::runtime::Handle,
2121 now: Instant,
2122 ) {
2123 let request_id = rand::random::<u64>();
2129
2130 debug!(
2131 "Starting Quinn connection to bootstrap node {:?} with request ID {}",
2132 node_id, request_id
2133 );
2134
2135 self.simulate_bootstrap_response(node_id, now);
2145 }
2146
2147 pub(crate) fn start_quinn_query_with_address(
2149 &mut self,
2150 node_id: BootstrapNodeId,
2151 bootstrap_address: SocketAddr,
2152 now: Instant,
2153 ) {
2154 let request_id = rand::random::<u64>();
2155
2156 info!(
2157 "Establishing Quinn connection to bootstrap node {:?} at {}",
2158 node_id, bootstrap_address
2159 );
2160
2161 if let Some(runtime) = &self.runtime_handle {
2163 let timeout = self.config.bootstrap_query_timeout;
2164
2165 let (response_tx, _response_rx) = tokio::sync::mpsc::unbounded_channel();
2167
2168 runtime.spawn(async move {
2173 match Self::perform_bootstrap_query(bootstrap_address, request_id, timeout).await {
2174 Ok(observed_address) => {
2175 let response = ServerReflexiveResponse {
2176 bootstrap_node: node_id,
2177 observed_address,
2178 response_time: now.elapsed(),
2179 timestamp: Instant::now(),
2180 };
2181
2182 let _ = response_tx.send(response);
2184
2185 info!(
2186 "Successfully received observed address {} from bootstrap node {:?}",
2187 observed_address, node_id
2188 );
2189 }
2190 Err(e) => {
2191 warn!(
2192 "Failed to query bootstrap node {:?} at {}: {}",
2193 node_id, bootstrap_address, e
2194 );
2195 }
2196 }
2197 });
2198 } else {
2199 warn!(
2200 "No async runtime available for Quinn query to {:?}",
2201 node_id
2202 );
2203 self.simulate_bootstrap_response(node_id, now);
2204 }
2205 }
2206
2207 async fn perform_bootstrap_query(
2212 _bootstrap_address: SocketAddr,
2213 _request_id: u64,
2214 _timeout: Duration,
2215 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2216 Err("Bootstrap query not implemented for low-level API".into())
2220
2221 }
2281
2282 fn create_discovery_request(request_id: u64) -> Vec<u8> {
2284 let mut request = Vec::new();
2285
2286 request.extend_from_slice(&request_id.to_be_bytes());
2291 request.extend_from_slice(
2292 &std::time::SystemTime::now()
2293 .duration_since(std::time::UNIX_EPOCH)
2294 .unwrap_or_default()
2295 .as_millis()
2296 .to_be_bytes()[8..16],
2297 ); request.extend_from_slice(&1u32.to_be_bytes()); debug!(
2301 "Created discovery request: {} bytes, request_id: {}",
2302 request.len(),
2303 request_id
2304 );
2305 request
2306 }
2307
2308 async fn wait_for_add_address_frame(
2310 _connection: &Connection,
2311 _expected_request_id: u64,
2312 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2313 Err("wait_for_add_address_frame not implemented for low-level API".into())
2316
2317 }
2353
2354 fn create_response_channel(
2356 &self,
2357 ) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
2358 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2361 tx
2363 }
2364
2365 pub(crate) fn poll_queries(
2366 &mut self,
2367 _active_queries: &HashMap<BootstrapNodeId, QueryState>,
2368 now: Instant,
2369 ) -> Vec<ServerReflexiveResponse> {
2370 let mut responses = Vec::new();
2371
2372 while let Some(response) = self.responses.pop_front() {
2374 responses.push(response);
2375 }
2376
2377 let mut timed_out_nodes = Vec::new();
2379 for (&node_id, &timeout) in &self.query_timeouts {
2380 if now >= timeout {
2381 timed_out_nodes.push(node_id);
2382 }
2383 }
2384
2385 for node_id in timed_out_nodes {
2387 self.query_timeouts.remove(&node_id);
2388
2389 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2390 match query_state {
2391 QueryState::Pending { attempts, .. }
2392 if *attempts < self.config.max_query_retries =>
2393 {
2394 *attempts += 1;
2396 let new_timeout = now + self.config.bootstrap_query_timeout;
2397 self.query_timeouts.insert(node_id, new_timeout);
2398
2399 debug!(
2400 "Retrying server reflexive query to bootstrap node {:?} (attempt {})",
2401 node_id, attempts
2402 );
2403
2404 self.simulate_bootstrap_response(node_id, now);
2406 }
2407 _ => {
2408 self.active_queries.insert(node_id, QueryState::Failed);
2410 warn!(
2411 "Server reflexive query to bootstrap node {:?} failed after retries",
2412 node_id
2413 );
2414 }
2415 }
2416 }
2417 }
2418
2419 responses
2420 }
2421
2422 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
2425 let simulated_external_addr = match node_id.0 % 3 {
2427 0 => "203.0.113.1:45678".parse().unwrap(),
2428 1 => "198.51.100.2:45679".parse().unwrap(),
2429 _ => "192.0.2.3:45680".parse().unwrap(),
2430 };
2431
2432 let response = ServerReflexiveResponse {
2433 bootstrap_node: node_id,
2434 observed_address: simulated_external_addr,
2435 response_time: Duration::from_millis(50 + node_id.0 * 10),
2436 timestamp: now,
2437 };
2438
2439 self.responses.push_back(response);
2440
2441 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2443 *query_state = QueryState::Completed;
2444 }
2445
2446 debug!(
2447 "Received simulated server reflexive response from bootstrap node {:?}: {}",
2448 node_id, simulated_external_addr
2449 );
2450 }
2451}
2452
2453#[derive(Debug)]
2455pub(crate) struct SymmetricNatPredictor {
2456 config: DiscoveryConfig,
2457}
2458
2459impl SymmetricNatPredictor {
2460 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2461 Self {
2462 config: config.clone(),
2463 }
2464 }
2465
2466 pub(crate) fn generate_predictions(
2471 &mut self,
2472 pattern_analysis: &PatternAnalysisState,
2473 max_count: usize,
2474 ) -> Vec<DiscoveryCandidate> {
2475 let mut predictions = Vec::new();
2476
2477 if pattern_analysis.allocation_history.is_empty() || max_count == 0 {
2478 return predictions;
2479 }
2480
2481 let recent_events: Vec<_> = pattern_analysis
2483 .allocation_history
2484 .iter()
2485 .rev()
2486 .take(5) .collect();
2488
2489 if recent_events.len() < 2 {
2490 return predictions;
2491 }
2492
2493 match &pattern_analysis.detected_pattern {
2494 Some(pattern) => {
2495 predictions.extend(self.generate_pattern_based_predictions(pattern, max_count));
2496 }
2497 None => {
2498 predictions.extend(self.generate_heuristic_predictions(&recent_events, max_count));
2499 }
2500 }
2501
2502 predictions.truncate(max_count);
2504 predictions
2505 }
2506
2507 fn generate_pattern_based_predictions(
2509 &self,
2510 pattern: &PortAllocationPattern,
2511 max_count: usize,
2512 ) -> Vec<DiscoveryCandidate> {
2513 let mut predictions = Vec::new();
2514
2515 match pattern.pattern_type {
2516 AllocationPatternType::Sequential => {
2517 for i in 1..=max_count as u16 {
2519 let predicted_port = pattern.base_port.wrapping_add(i);
2520 if self.is_valid_port(predicted_port) {
2521 predictions.push(
2522 self.create_predicted_candidate(predicted_port, pattern.confidence),
2523 );
2524 }
2525 }
2526 }
2527 AllocationPatternType::FixedStride => {
2528 for i in 1..=max_count as u16 {
2530 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
2531 if self.is_valid_port(predicted_port) {
2532 predictions.push(
2533 self.create_predicted_candidate(predicted_port, pattern.confidence),
2534 );
2535 }
2536 }
2537 }
2538 AllocationPatternType::PoolBased => {
2539 if let Some((min_port, max_port)) = pattern.pool_boundaries {
2541 let pool_size = max_port - min_port + 1;
2542 let step = (pool_size / max_count as u16).max(1);
2543
2544 for i in 0..max_count as u16 {
2545 let predicted_port = min_port + (i * step);
2546 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
2547 predictions.push(self.create_predicted_candidate(
2548 predicted_port,
2549 pattern.confidence * 0.8,
2550 ));
2551 }
2552 }
2553 }
2554 }
2555 AllocationPatternType::TimeBased => {
2556 for i in 1..=max_count as u16 {
2559 let predicted_port = pattern.base_port.wrapping_add(i);
2560 if self.is_valid_port(predicted_port) {
2561 predictions.push(
2562 self.create_predicted_candidate(
2563 predicted_port,
2564 pattern.confidence * 0.6,
2565 ),
2566 );
2567 }
2568 }
2569 }
2570 AllocationPatternType::Random | AllocationPatternType::Unknown => {
2571 predictions
2573 .extend(self.generate_statistical_predictions(pattern.base_port, max_count));
2574 }
2575 }
2576
2577 predictions
2578 }
2579
2580 fn generate_heuristic_predictions(
2582 &self,
2583 recent_events: &[&PortAllocationEvent],
2584 max_count: usize,
2585 ) -> Vec<DiscoveryCandidate> {
2586 let mut predictions = Vec::new();
2587
2588 if let Some(latest_event) = recent_events.first() {
2589 let base_port = latest_event.port;
2590
2591 for i in 1..=(max_count / 3) as u16 {
2595 let predicted_port = base_port.wrapping_add(i);
2596 if self.is_valid_port(predicted_port) {
2597 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
2598 }
2599 }
2600
2601 if base_port % 2 == 0 {
2603 let predicted_port = base_port + 1;
2604 if self.is_valid_port(predicted_port) {
2605 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
2606 }
2607 }
2608
2609 for stride in [2, 4, 8, 16] {
2611 if predictions.len() >= max_count {
2612 break;
2613 }
2614 let predicted_port = base_port.wrapping_add(stride);
2615 if self.is_valid_port(predicted_port) {
2616 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
2617 }
2618 }
2619
2620 if recent_events.len() >= 2 {
2622 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
2623 if stride > 0 && stride <= 100 {
2624 for i in 1..=3 {
2626 if predictions.len() >= max_count {
2627 break;
2628 }
2629 let predicted_port = base_port.wrapping_add(stride * i);
2630 if self.is_valid_port(predicted_port) {
2631 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
2632 }
2633 }
2634 }
2635 }
2636 }
2637
2638 predictions.truncate(max_count);
2639 predictions
2640 }
2641
2642 fn generate_statistical_predictions(
2644 &self,
2645 base_port: u16,
2646 max_count: usize,
2647 ) -> Vec<DiscoveryCandidate> {
2648 let mut predictions = Vec::new();
2649
2650 let common_ranges = [
2652 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
2657
2658 let current_range = common_ranges
2660 .iter()
2661 .find(|(min, max)| base_port >= *min && base_port <= *max)
2662 .copied()
2663 .unwrap_or((1024, 65535));
2664
2665 let range_size = current_range.1 - current_range.0;
2667 let step = (range_size / max_count as u16).max(1);
2668
2669 for i in 0..max_count {
2670 let offset = (i as u16 * step) % range_size;
2671 let predicted_port = current_range.0 + offset;
2672
2673 if self.is_valid_port(predicted_port) && predicted_port != base_port {
2674 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
2675 }
2676 }
2677
2678 predictions
2679 }
2680
2681 fn is_valid_port(&self, port: u16) -> bool {
2683 const COMMON_PORTS_TO_AVOID: &[u16] = &[
2687 21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3389, 5432, 3306, 6379, 27017, ];
2703
2704 port != 0 && port >= 1024 && !COMMON_PORTS_TO_AVOID.contains(&port)
2705 }
2706
2707 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2709 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
2713
2714 DiscoveryCandidate {
2715 address: SocketAddr::new(
2716 "0.0.0.0".parse().unwrap(), port,
2718 ),
2719 priority,
2720 source: DiscoverySourceType::Predicted,
2721 state: CandidateState::New,
2722 }
2723 }
2724
2725 pub(crate) fn analyze_allocation_patterns(
2727 &self,
2728 history: &VecDeque<PortAllocationEvent>,
2729 ) -> Option<PortAllocationPattern> {
2730 if history.len() < 3 {
2731 return None;
2732 }
2733
2734 let recent_ports: Vec<u16> = history
2735 .iter()
2736 .rev()
2737 .take(10)
2738 .map(|event| event.port)
2739 .collect();
2740
2741 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2743 return Some(pattern);
2744 }
2745
2746 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2748 return Some(pattern);
2749 }
2750
2751 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2753 return Some(pattern);
2754 }
2755
2756 if let Some(pattern) = self.detect_time_based_pattern(history) {
2758 return Some(pattern);
2759 }
2760
2761 None
2762 }
2763
2764 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2766 if ports.len() < 3 {
2767 return None;
2768 }
2769
2770 let mut sequential_count = 0;
2771 let mut total_comparisons = 0;
2772
2773 for i in 1..ports.len() {
2774 total_comparisons += 1;
2775 let diff = ports[i - 1].wrapping_sub(ports[i]);
2776 if diff == 1 {
2777 sequential_count += 1;
2778 }
2779 }
2780
2781 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2782
2783 if sequential_ratio >= 0.6 {
2784 let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2788 pattern_type: AllocationPatternType::Sequential,
2789 base_port: ports[0],
2790 stride: 1,
2791 pool_boundaries: None,
2792 confidence,
2793 })
2794 } else {
2795 None
2796 }
2797 }
2798
2799 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2801 if ports.len() < 4 {
2802 return None;
2803 }
2804
2805 let mut diffs = Vec::new();
2807 for i in 1..ports.len() {
2808 let diff = ports[i - 1].wrapping_sub(ports[i]);
2809 if diff > 0 && diff <= 1000 {
2810 diffs.push(diff);
2812 }
2813 }
2814
2815 if diffs.len() < 2 {
2816 return None;
2817 }
2818
2819 let mut diff_counts = std::collections::HashMap::new();
2821 for &diff in &diffs {
2822 *diff_counts.entry(diff).or_insert(0) += 1;
2823 }
2824
2825 let (most_common_diff, count) = diff_counts
2826 .iter()
2827 .max_by_key(|&(_, &count)| count)
2828 .map(|(&diff, &count)| (diff, count))?;
2829
2830 let consistency_ratio = count as f64 / diffs.len() as f64;
2831
2832 if consistency_ratio >= 0.5 && most_common_diff > 1 {
2833 let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2837 pattern_type: AllocationPatternType::FixedStride,
2838 base_port: ports[0],
2839 stride: most_common_diff,
2840 pool_boundaries: None,
2841 confidence,
2842 })
2843 } else {
2844 None
2845 }
2846 }
2847
2848 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2850 if ports.len() < 5 {
2851 return None;
2852 }
2853
2854 let min_port = *ports.iter().min()?;
2855 let max_port = *ports.iter().max()?;
2856 let range = max_port - min_port;
2857
2858 if range > 0 && range <= 10000 {
2860 let expected_step = range / (ports.len() as u16 - 1);
2863 let mut uniform_score = 0.0;
2864
2865 let mut sorted_ports = ports.to_vec();
2866 sorted_ports.sort_unstable();
2867
2868 for i in 1..sorted_ports.len() {
2869 let actual_step = sorted_ports[i] - sorted_ports[i - 1];
2870 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2871 let normalized_diff = step_diff / expected_step as f64;
2872 uniform_score += 1.0 - normalized_diff.min(1.0);
2873 }
2874
2875 uniform_score /= (sorted_ports.len() - 1) as f64;
2876
2877 if uniform_score >= 0.4 {
2878 let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2882 pattern_type: AllocationPatternType::PoolBased,
2883 base_port: min_port,
2884 stride: expected_step,
2885 pool_boundaries: Some((min_port, max_port)),
2886 confidence,
2887 })
2888 } else {
2889 None
2890 }
2891 } else {
2892 None
2893 }
2894 }
2895
2896 fn detect_time_based_pattern(
2898 &self,
2899 history: &VecDeque<PortAllocationEvent>,
2900 ) -> Option<PortAllocationPattern> {
2901 if history.len() < 4 {
2902 return None;
2903 }
2904
2905 let mut time_intervals = Vec::new();
2907 let events: Vec<_> = history.iter().collect();
2908
2909 for i in 1..events.len() {
2910 let interval = events[i - 1].timestamp.duration_since(events[i].timestamp);
2911 time_intervals.push(interval);
2912 }
2913
2914 if time_intervals.is_empty() {
2915 return None;
2916 }
2917
2918 let avg_interval =
2920 time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2921
2922 let mut consistency_score = 0.0;
2923 for interval in &time_intervals {
2924 let diff = (*interval).abs_diff(avg_interval);
2925
2926 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2927 consistency_score += 1.0 - normalized_diff.min(1.0);
2928 }
2929
2930 consistency_score /= time_intervals.len() as f64;
2931
2932 if consistency_score >= 0.6
2933 && avg_interval.as_millis() > 100
2934 && avg_interval.as_millis() < 10000
2935 {
2936 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2939 pattern_type: AllocationPatternType::TimeBased,
2940 base_port: events[0].port,
2941 stride: 1, pool_boundaries: None,
2943 confidence,
2944 })
2945 } else {
2946 None
2947 }
2948 }
2949
2950 pub(crate) fn generate_confidence_scored_predictions(
2952 &mut self,
2953 base_address: SocketAddr,
2954 pattern_analysis: &PatternAnalysisState,
2955 max_count: usize,
2956 ) -> Vec<(DiscoveryCandidate, f64)> {
2957 let mut scored_predictions = Vec::new();
2958
2959 let predictions = self.generate_predictions(pattern_analysis, max_count);
2961
2962 for mut prediction in predictions {
2963 prediction.address = SocketAddr::new(base_address.ip(), prediction.address.port());
2965
2966 let confidence =
2968 self.calculate_prediction_confidence(&prediction, pattern_analysis, base_address);
2969
2970 scored_predictions.push((prediction, confidence));
2971 }
2972
2973 scored_predictions
2975 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2976
2977 scored_predictions
2978 }
2979
2980 fn calculate_prediction_confidence(
2982 &self,
2983 prediction: &DiscoveryCandidate,
2984 pattern_analysis: &PatternAnalysisState,
2985 base_address: SocketAddr,
2986 ) -> f64 {
2987 let mut confidence = 0.5; if let Some(ref pattern) = pattern_analysis.detected_pattern {
2991 confidence += pattern.confidence * 0.3;
2992 }
2993
2994 confidence += pattern_analysis.prediction_accuracy * 0.2;
2996
2997 let port_distance = (prediction.address.port() as i32 - base_address.port() as i32).abs();
2999 let proximity_score = if port_distance <= 10 {
3000 0.2
3001 } else if port_distance <= 100 {
3002 0.1
3003 } else {
3004 0.0
3005 };
3006 confidence += proximity_score;
3007
3008 let port_range_score = match prediction.address.port() {
3010 1024..=4999 => 0.1, 5000..=9999 => 0.15, 10000..=20000 => 0.1, 32768..=65535 => 0.05, _ => 0.0,
3015 };
3016 confidence += port_range_score;
3017
3018 confidence.max(0.0).min(1.0)
3020 }
3021
3022 pub(crate) fn update_pattern_analysis(
3024 &self,
3025 pattern_analysis: &mut PatternAnalysisState,
3026 new_event: PortAllocationEvent,
3027 ) {
3028 pattern_analysis.allocation_history.push_back(new_event);
3030
3031 if pattern_analysis.allocation_history.len() > 20 {
3033 pattern_analysis.allocation_history.pop_front();
3034 }
3035
3036 pattern_analysis.detected_pattern =
3038 self.analyze_allocation_patterns(&pattern_analysis.allocation_history);
3039
3040 if let Some(ref pattern) = pattern_analysis.detected_pattern {
3042 pattern_analysis.confidence_level = pattern.confidence;
3043 } else {
3044 pattern_analysis.confidence_level *= 0.9; }
3046
3047 pattern_analysis.prediction_accuracy *= 0.95;
3051 }
3052}
3053
3054#[derive(Debug)]
3056pub(crate) struct BootstrapNodeManager {
3057 config: DiscoveryConfig,
3058 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
3059 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
3060 performance_tracker: BootstrapPerformanceTracker,
3061 last_health_check: Option<Instant>,
3062 health_check_interval: Duration,
3063 failover_threshold: f64,
3064 discovery_sources: Vec<BootstrapDiscoverySource>,
3065}
3066
3067#[derive(Debug, Clone)]
3069pub(crate) struct BootstrapNodeInfo {
3070 pub address: SocketAddr,
3072 pub last_seen: Instant,
3074 pub can_coordinate: bool,
3076 pub health_status: BootstrapHealthStatus,
3078 pub capabilities: BootstrapCapabilities,
3080 pub priority: u32,
3082 pub discovery_source: BootstrapDiscoverySource,
3084}
3085
3086#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3088pub(crate) enum BootstrapHealthStatus {
3089 Healthy,
3091 Degraded,
3093 Unhealthy,
3095 Unknown,
3097}
3098
3099#[derive(Debug, Clone, Default)]
3101pub(crate) struct BootstrapCapabilities {
3102 pub supports_nat_traversal: bool,
3104 pub supports_ipv6: bool,
3106 pub supports_quic_extensions: bool,
3108 pub max_concurrent_coordinations: u32,
3110 pub supported_quic_versions: Vec<u32>,
3112}
3113
3114#[derive(Debug, Clone, Default)]
3116pub(crate) struct BootstrapHealthStats {
3117 pub connection_attempts: u32,
3119 pub successful_connections: u32,
3121 pub failed_connections: u32,
3123 pub average_rtt: Option<Duration>,
3125 pub recent_rtts: VecDeque<Duration>,
3127 pub last_health_check: Option<Instant>,
3129 pub consecutive_failures: u32,
3131 pub coordination_requests: u32,
3133 pub successful_coordinations: u32,
3135}
3136
3137#[derive(Debug, Default)]
3139pub(crate) struct BootstrapPerformanceTracker {
3140 pub overall_success_rate: f64,
3142 pub average_response_time: Duration,
3144 pub best_performers: Vec<BootstrapNodeId>,
3146 pub failover_nodes: Vec<BootstrapNodeId>,
3148 pub performance_history: VecDeque<PerformanceSnapshot>,
3150}
3151
3152#[derive(Debug, Clone)]
3154pub(crate) struct PerformanceSnapshot {
3155 pub timestamp: Instant,
3156 pub active_nodes: u32,
3157 pub success_rate: f64,
3158 pub average_rtt: Duration,
3159}
3160
3161#[derive(Debug, Clone, PartialEq, Eq)]
3163pub(crate) enum BootstrapDiscoverySource {
3164 Static,
3166 DNS,
3168 DHT,
3170 Multicast,
3172 UserProvided,
3174}
3175
3176impl BootstrapNodeManager {
3177 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3178 Self {
3179 config: config.clone(),
3180 bootstrap_nodes: HashMap::new(),
3181 health_stats: HashMap::new(),
3182 performance_tracker: BootstrapPerformanceTracker::default(),
3183 last_health_check: None,
3184 health_check_interval: Duration::from_secs(30),
3185 failover_threshold: 0.3, discovery_sources: vec![
3187 BootstrapDiscoverySource::Static,
3188 BootstrapDiscoverySource::DNS,
3189 BootstrapDiscoverySource::UserProvided,
3190 ],
3191 }
3192 }
3193
3194 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
3196 let now = Instant::now();
3197
3198 for (i, node) in nodes.into_iter().enumerate() {
3200 let node_id = BootstrapNodeId(i as u64);
3201
3202 let node_info = BootstrapNodeInfo {
3203 address: node.address,
3204 last_seen: node.last_seen,
3205 can_coordinate: node.can_coordinate,
3206 health_status: BootstrapHealthStatus::Unknown,
3207 capabilities: BootstrapCapabilities {
3208 supports_nat_traversal: node.can_coordinate,
3209 supports_ipv6: node.address.is_ipv6(),
3210 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
3214 priority: self.calculate_initial_priority(&node),
3215 discovery_source: BootstrapDiscoverySource::UserProvided,
3216 };
3217
3218 self.bootstrap_nodes.insert(node_id, node_info);
3219
3220 self.health_stats.entry(node_id).or_default();
3222 }
3223
3224 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
3225 self.schedule_health_check(now);
3226 }
3227
3228 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
3230 let mut active_nodes: Vec<_> = self
3231 .bootstrap_nodes
3232 .iter()
3233 .filter(|(_, node)| {
3234 matches!(
3235 node.health_status,
3236 BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown
3237 )
3238 })
3239 .map(|(&id, node)| (id, node))
3240 .collect();
3241
3242 active_nodes.sort_by(|a, b| {
3244 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
3246 if health_cmp != std::cmp::Ordering::Equal {
3247 return health_cmp;
3248 }
3249
3250 b.1.priority.cmp(&a.1.priority)
3252 });
3253
3254 active_nodes.into_iter().map(|(id, _)| id).collect()
3255 }
3256
3257 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
3259 self.bootstrap_nodes.get(&id).map(|node| node.address)
3260 }
3261
3262 pub(crate) fn perform_health_check(&mut self, now: Instant) {
3264 if let Some(last_check) = self.last_health_check {
3265 if now.duration_since(last_check) < self.health_check_interval {
3266 return; }
3268 }
3269
3270 debug!(
3271 "Performing health check on {} bootstrap nodes",
3272 self.bootstrap_nodes.len()
3273 );
3274
3275 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
3277
3278 for node_id in node_ids {
3279 self.check_node_health(node_id, now);
3280 }
3281
3282 self.update_performance_metrics(now);
3283 self.last_health_check = Some(now);
3284 }
3285
3286 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
3288 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
3290 if node_info_opt.is_none() {
3291 return; }
3293 let node_info_for_priority = node_info_opt.unwrap();
3294 let current_health_status = node_info_for_priority.health_status;
3295
3296 let (_success_rate, new_health_status, _average_rtt) = {
3298 let stats = self.health_stats.get_mut(&node_id).unwrap();
3299
3300 let success_rate = if stats.connection_attempts > 0 {
3302 stats.successful_connections as f64 / stats.connection_attempts as f64
3303 } else {
3304 1.0 };
3306
3307 if !stats.recent_rtts.is_empty() {
3309 let total_rtt: Duration = stats.recent_rtts.iter().sum();
3310 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
3311 }
3312
3313 let new_health_status = if stats.consecutive_failures >= 3 {
3315 BootstrapHealthStatus::Unhealthy
3316 } else if success_rate < self.failover_threshold {
3317 BootstrapHealthStatus::Degraded
3318 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
3319 BootstrapHealthStatus::Healthy
3320 } else {
3321 current_health_status };
3323
3324 stats.last_health_check = Some(now);
3325
3326 (success_rate, new_health_status, stats.average_rtt)
3327 };
3328
3329 let stats_snapshot = self.health_stats.get(&node_id).unwrap();
3331 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
3332
3333 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3335 if new_health_status != node_info.health_status {
3336 info!(
3337 "Bootstrap node {:?} health status changed: {:?} -> {:?}",
3338 node_id, node_info.health_status, new_health_status
3339 );
3340 node_info.health_status = new_health_status;
3341 }
3342
3343 node_info.priority = new_priority;
3344 }
3345 }
3346
3347 pub(crate) fn record_connection_attempt(
3349 &mut self,
3350 node_id: BootstrapNodeId,
3351 success: bool,
3352 rtt: Option<Duration>,
3353 ) {
3354 if let Some(stats) = self.health_stats.get_mut(&node_id) {
3355 stats.connection_attempts += 1;
3356
3357 if success {
3358 stats.successful_connections += 1;
3359 stats.consecutive_failures = 0;
3360
3361 if let Some(rtt) = rtt {
3362 stats.recent_rtts.push_back(rtt);
3363 if stats.recent_rtts.len() > 10 {
3364 stats.recent_rtts.pop_front();
3365 }
3366 }
3367 } else {
3368 stats.failed_connections += 1;
3369 stats.consecutive_failures += 1;
3370 }
3371 }
3372
3373 if success {
3375 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3376 node_info.last_seen = Instant::now();
3377 }
3378 }
3379 }
3380
3381 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
3383 if let Some(stats) = self.health_stats.get_mut(&node_id) {
3384 stats.coordination_requests += 1;
3385 if success {
3386 stats.successful_coordinations += 1;
3387 }
3388 }
3389 }
3390
3391 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
3393 let mut nodes_with_scores: Vec<_> = self
3394 .bootstrap_nodes
3395 .iter()
3396 .filter_map(|(&id, node)| {
3397 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
3398 let score = self.calculate_performance_score(id, node);
3399 Some((id, score))
3400 } else {
3401 None
3402 }
3403 })
3404 .collect();
3405
3406 nodes_with_scores
3407 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3408
3409 nodes_with_scores
3410 .into_iter()
3411 .take(count)
3412 .map(|(id, _)| id)
3413 .collect()
3414 }
3415
3416 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
3418 let mut discovered_nodes = Vec::new();
3419
3420 if let Ok(dns_nodes) = self.discover_via_dns() {
3422 discovered_nodes.extend(dns_nodes);
3423 }
3424
3425 if let Ok(multicast_nodes) = self.discover_via_multicast() {
3427 discovered_nodes.extend(multicast_nodes);
3428 }
3429
3430 for node in &discovered_nodes {
3432 let node_id = BootstrapNodeId(rand::random());
3433 self.bootstrap_nodes.insert(node_id, node.clone());
3434 self.health_stats
3435 .insert(node_id, BootstrapHealthStats::default());
3436 }
3437
3438 if !discovered_nodes.is_empty() {
3439 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
3440 }
3441
3442 Ok(discovered_nodes)
3443 }
3444
3445 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3447 debug!("DNS-based bootstrap discovery not yet implemented");
3450 Ok(Vec::new())
3451 }
3452
3453 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3455 debug!("Multicast-based bootstrap discovery not yet implemented");
3458 Ok(Vec::new())
3459 }
3460
3461 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
3463 let mut priority = 100; if node.can_coordinate {
3466 priority += 50;
3467 }
3468
3469 if let Some(rtt) = node.rtt {
3470 if rtt < Duration::from_millis(50) {
3471 priority += 30;
3472 } else if rtt < Duration::from_millis(100) {
3473 priority += 20;
3474 } else if rtt < Duration::from_millis(200) {
3475 priority += 10;
3476 }
3477 }
3478
3479 if node.address.is_ipv6() {
3481 priority += 10;
3482 }
3483
3484 priority
3485 }
3486
3487 fn calculate_dynamic_priority(
3489 &self,
3490 node_info: &BootstrapNodeInfo,
3491 stats: &BootstrapHealthStats,
3492 ) -> u32 {
3493 let mut priority = node_info.priority;
3494
3495 let success_rate = if stats.connection_attempts > 0 {
3497 stats.successful_connections as f64 / stats.connection_attempts as f64
3498 } else {
3499 1.0
3500 };
3501
3502 priority = (priority as f64 * success_rate) as u32;
3503
3504 if let Some(avg_rtt) = stats.average_rtt {
3506 if avg_rtt < Duration::from_millis(50) {
3507 priority += 20;
3508 } else if avg_rtt > Duration::from_millis(500) {
3509 priority = priority.saturating_sub(20);
3510 }
3511 }
3512
3513 priority = priority.saturating_sub(stats.consecutive_failures * 10);
3515
3516 priority.max(1) }
3518
3519 fn calculate_performance_score(
3521 &self,
3522 node_id: BootstrapNodeId,
3523 _node_info: &BootstrapNodeInfo,
3524 ) -> f64 {
3525 let stats = self.health_stats.get(&node_id).unwrap();
3526
3527 let mut score = 0.0;
3528
3529 let success_rate = if stats.connection_attempts > 0 {
3531 stats.successful_connections as f64 / stats.connection_attempts as f64
3532 } else {
3533 1.0
3534 };
3535 score += success_rate * 0.4;
3536
3537 if let Some(avg_rtt) = stats.average_rtt {
3539 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
3540 score += rtt_score * 0.3;
3541 } else {
3542 score += 0.3; }
3544
3545 let coord_success_rate = if stats.coordination_requests > 0 {
3547 stats.successful_coordinations as f64 / stats.coordination_requests as f64
3548 } else {
3549 1.0
3550 };
3551 score += coord_success_rate * 0.2;
3552
3553 let stability_score = if stats.consecutive_failures == 0 {
3555 1.0
3556 } else {
3557 1.0 / (stats.consecutive_failures as f64 + 1.0)
3558 };
3559 score += stability_score * 0.1;
3560
3561 score
3562 }
3563
3564 fn compare_health_status(
3566 &self,
3567 a: BootstrapHealthStatus,
3568 b: BootstrapHealthStatus,
3569 ) -> std::cmp::Ordering {
3570 use std::cmp::Ordering;
3571
3572 match (a, b) {
3573 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
3574 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
3576 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
3577 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
3579 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
3580 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
3582 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
3583 }
3584 }
3585
3586 fn update_performance_metrics(&mut self, now: Instant) {
3588 let mut total_attempts = 0;
3589 let mut total_successes = 0;
3590 let mut total_rtt = Duration::ZERO;
3591 let mut rtt_count = 0;
3592
3593 for stats in self.health_stats.values() {
3594 total_attempts += stats.connection_attempts;
3595 total_successes += stats.successful_connections;
3596
3597 if let Some(avg_rtt) = stats.average_rtt {
3598 total_rtt += avg_rtt;
3599 rtt_count += 1;
3600 }
3601 }
3602
3603 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
3604 total_successes as f64 / total_attempts as f64
3605 } else {
3606 1.0
3607 };
3608
3609 self.performance_tracker.average_response_time = if rtt_count > 0 {
3610 total_rtt / rtt_count
3611 } else {
3612 Duration::from_millis(100) };
3614
3615 self.performance_tracker.best_performers = self.get_best_performers(5);
3617
3618 let snapshot = PerformanceSnapshot {
3620 timestamp: now,
3621 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
3622 success_rate: self.performance_tracker.overall_success_rate,
3623 average_rtt: self.performance_tracker.average_response_time,
3624 };
3625
3626 self.performance_tracker
3627 .performance_history
3628 .push_back(snapshot);
3629 if self.performance_tracker.performance_history.len() > 100 {
3630 self.performance_tracker.performance_history.pop_front();
3631 }
3632 }
3633
3634 fn schedule_health_check(&mut self, _now: Instant) {
3636 }
3639
3640 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
3642 &self.performance_tracker
3643 }
3644
3645 pub(crate) fn get_node_health_stats(
3647 &self,
3648 node_id: BootstrapNodeId,
3649 ) -> Option<&BootstrapHealthStats> {
3650 self.health_stats.get(&node_id)
3651 }
3652}
3653
3654#[derive(Debug)]
3656pub(crate) struct DiscoveryCache {
3657 config: DiscoveryConfig,
3658}
3659
3660impl DiscoveryCache {
3661 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3662 Self {
3663 config: config.clone(),
3664 }
3665 }
3666}
3667
3668pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
3670 #[cfg(target_os = "windows")]
3671 return Box::new(WindowsInterfaceDiscovery::new());
3672
3673 #[cfg(target_os = "linux")]
3674 return Box::new(LinuxInterfaceDiscovery::new());
3675
3676 #[cfg(all(target_os = "macos", feature = "network-discovery"))]
3677 return Box::new(MacOSInterfaceDiscovery::new());
3678
3679 #[cfg(all(target_os = "macos", not(feature = "network-discovery")))]
3680 return Box::new(GenericInterfaceDiscovery::new());
3681
3682 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
3683 return Box::new(GenericInterfaceDiscovery::new());
3684}
3685
3686pub(crate) struct GenericInterfaceDiscovery {
3696 scan_complete: bool,
3697}
3698
3699impl GenericInterfaceDiscovery {
3700 pub(crate) fn new() -> Self {
3701 Self {
3702 scan_complete: false,
3703 }
3704 }
3705}
3706
3707impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
3708 fn start_scan(&mut self) -> Result<(), String> {
3709 self.scan_complete = true;
3711 Ok(())
3712 }
3713
3714 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
3715 if self.scan_complete {
3716 self.scan_complete = false;
3717 Some(vec![NetworkInterface {
3718 name: "generic".to_string(),
3719 addresses: vec!["127.0.0.1:0".parse().unwrap()],
3720 is_up: true,
3721 is_wireless: false,
3722 mtu: Some(1500),
3723 }])
3724 } else {
3725 None
3726 }
3727 }
3728}
3729
3730impl std::fmt::Display for DiscoveryError {
3731 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3732 match self {
3733 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
3734 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
3735 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
3736 Self::InsufficientCandidates { found, required } => {
3737 write!(f, "insufficient candidates found: {found} < {required}")
3738 }
3739 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3740 Self::ConfigurationError(msg) => write!(f, "configuration error: {msg}"),
3741 Self::InternalError(msg) => write!(f, "internal error: {msg}"),
3742 }
3743 }
3744}
3745
3746impl std::error::Error for DiscoveryError {}
3747
3748pub mod test_utils {
3750 use super::*;
3751
3752 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
3754 let mut priority = 100; match address {
3756 IpAddr::V4(ipv4) => {
3757 if ipv4.is_private() {
3758 priority += 50; }
3760 }
3761 IpAddr::V6(ipv6) => {
3762 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3765 let segments = ipv6.segments();
3766 if segments[0] & 0xE000 == 0x2000 {
3767 priority += 60;
3769 } else if segments[0] & 0xFFC0 == 0xFE80 {
3770 priority += 20;
3772 } else if segments[0] & 0xFE00 == 0xFC00 {
3773 priority += 40;
3775 } else {
3776 priority += 30;
3778 }
3779 }
3780
3781 priority += 10; }
3784 }
3785 priority
3786 }
3787
3788 pub fn is_valid_address(address: &IpAddr) -> bool {
3790 match address {
3791 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3792 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3793 }
3794 }
3795}
3796
3797#[cfg(test)]
3798mod tests {
3799 use super::*;
3800
3801 fn create_test_manager() -> CandidateDiscoveryManager {
3802 let config = DiscoveryConfig {
3803 total_timeout: Duration::from_secs(30),
3804 local_scan_timeout: Duration::from_secs(5),
3805 bootstrap_query_timeout: Duration::from_secs(10),
3806 max_query_retries: 3,
3807 max_candidates: 50,
3808 enable_symmetric_prediction: true,
3809 min_bootstrap_consensus: 2,
3810 interface_cache_ttl: Duration::from_secs(300),
3811 server_reflexive_cache_ttl: Duration::from_secs(600),
3812 bound_address: None,
3813 };
3814 CandidateDiscoveryManager::new(config)
3815 }
3816
3817 #[test]
3818 fn test_accept_quic_discovered_addresses() {
3819 let mut manager = create_test_manager();
3820 let peer_id = PeerId([1; 32]);
3821
3822 manager.start_discovery(peer_id, vec![]).unwrap();
3824
3825 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3827 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3828
3829 assert!(result.is_ok());
3830
3831 if let Some(session) = manager.active_sessions.get(&peer_id) {
3833 let found = session.discovered_candidates.iter().any(|c| {
3834 c.address == discovered_addr
3835 && matches!(c.source, DiscoverySourceType::ServerReflexive)
3836 });
3837 assert!(found, "QUIC-discovered address should be in candidates");
3838 }
3839 }
3840
3841 #[test]
3842 fn test_accept_quic_discovered_addresses_no_session() {
3843 let mut manager = create_test_manager();
3844 let peer_id = PeerId([1; 32]);
3845 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3846
3847 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3849
3850 assert!(result.is_err());
3851 match result {
3852 Err(DiscoveryError::InternalError(msg)) => {
3853 assert!(msg.contains("No active discovery session"));
3854 }
3855 _ => panic!("Expected InternalError for missing session"),
3856 }
3857 }
3858
3859 #[test]
3860 fn test_accept_quic_discovered_addresses_deduplication() {
3861 let mut manager = create_test_manager();
3862 let peer_id = PeerId([1; 32]);
3863
3864 manager.start_discovery(peer_id, vec![]).unwrap();
3866
3867 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3869 let result1 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3870 let result2 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3871
3872 assert!(result1.is_ok());
3873 assert!(result2.is_ok()); if let Some(session) = manager.active_sessions.get(&peer_id) {
3877 let count = session
3878 .discovered_candidates
3879 .iter()
3880 .filter(|c| c.address == discovered_addr)
3881 .count();
3882 assert_eq!(count, 1, "Should not have duplicate addresses");
3883 }
3884 }
3885
3886 #[test]
3887 fn test_accept_quic_discovered_addresses_priority() {
3888 let mut manager = create_test_manager();
3889 let peer_id = PeerId([1; 32]);
3890
3891 manager.start_discovery(peer_id, vec![]).unwrap();
3893
3894 let public_addr = "8.8.8.8:5000".parse().unwrap();
3896 let private_addr = "192.168.1.100:5000".parse().unwrap();
3897 let ipv6_addr = "[2001:db8::1]:5000".parse().unwrap();
3898
3899 manager
3900 .accept_quic_discovered_address(peer_id, public_addr)
3901 .unwrap();
3902 manager
3903 .accept_quic_discovered_address(peer_id, private_addr)
3904 .unwrap();
3905 manager
3906 .accept_quic_discovered_address(peer_id, ipv6_addr)
3907 .unwrap();
3908
3909 if let Some(session) = manager.active_sessions.get(&peer_id) {
3911 for candidate in &session.discovered_candidates {
3912 assert!(
3913 candidate.priority > 0,
3914 "All candidates should have non-zero priority"
3915 );
3916
3917 if candidate.address == ipv6_addr {
3919 let ipv4_priority = session
3920 .discovered_candidates
3921 .iter()
3922 .find(|c| c.address == public_addr)
3923 .map(|c| c.priority)
3924 .unwrap();
3925
3926 assert!(candidate.priority >= ipv4_priority);
3928 }
3929 }
3930 }
3931 }
3932
3933 #[test]
3934 fn test_accept_quic_discovered_addresses_event_generation() {
3935 let mut manager = create_test_manager();
3936 let peer_id = PeerId([1; 32]);
3937
3938 manager.start_discovery(peer_id, vec![]).unwrap();
3940
3941 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3943 manager
3944 .accept_quic_discovered_address(peer_id, discovered_addr)
3945 .unwrap();
3946
3947 let events = manager.poll_discovery_progress(peer_id);
3949
3950 let has_event = events.iter().any(|e| {
3952 matches!(e,
3953 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3954 if candidate.address == discovered_addr
3955 )
3956 });
3957
3958 assert!(
3959 has_event,
3960 "Should generate discovery event for QUIC-discovered address"
3961 );
3962 }
3963
3964 #[test]
3965 fn test_discovery_completes_without_server_reflexive_phase() {
3966 let mut manager = create_test_manager();
3967 let peer_id = PeerId([1; 32]);
3968
3969 manager.start_discovery(peer_id, vec![]).unwrap();
3971
3972 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3974 manager
3975 .accept_quic_discovered_address(peer_id, discovered_addr)
3976 .unwrap();
3977
3978 let status = manager.get_discovery_status(peer_id).unwrap();
3980
3981 match &status.phase {
3983 DiscoveryPhase::ServerReflexiveQuerying { .. } => {
3984 panic!("Should not be in ServerReflexiveQuerying phase when using QUIC discovery");
3985 }
3986 _ => {} }
3988 }
3989
3990 #[test]
3991 fn test_no_bootstrap_queries_when_using_quic_discovery() {
3992 let mut manager = create_test_manager();
3993 let peer_id = PeerId([1; 32]);
3994
3995 manager.start_discovery(peer_id, vec![]).unwrap();
3997
3998 let addr1 = "192.168.1.100:5000".parse().unwrap();
4000 let addr2 = "8.8.8.8:5000".parse().unwrap();
4001 manager
4002 .accept_quic_discovered_address(peer_id, addr1)
4003 .unwrap();
4004 manager
4005 .accept_quic_discovered_address(peer_id, addr2)
4006 .unwrap();
4007
4008 let status = manager.get_discovery_status(peer_id).unwrap();
4010
4011 assert!(status.discovered_candidates.len() >= 2);
4013
4014 if let Some(session) = manager.active_sessions.get(&peer_id) {
4016 assert_eq!(
4018 session.statistics.bootstrap_queries_sent, 0,
4019 "Should not query bootstrap nodes when using QUIC discovery"
4020 );
4021 }
4022 }
4023
4024 #[test]
4025 fn test_priority_differences_quic_vs_placeholder() {
4026 let mut manager = create_test_manager();
4027 let peer_id = PeerId([1; 32]);
4028
4029 manager.start_discovery(peer_id, vec![]).unwrap();
4031
4032 let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4034 manager
4035 .accept_quic_discovered_address(peer_id, discovered_addr)
4036 .unwrap();
4037
4038 if let Some(session) = manager.active_sessions.get(&peer_id) {
4040 let candidate = session
4041 .discovered_candidates
4042 .iter()
4043 .find(|c| c.address == discovered_addr)
4044 .expect("Should find the discovered address");
4045
4046 assert!(
4048 candidate.priority > 100,
4049 "QUIC-discovered address should have good priority"
4050 );
4051 assert!(candidate.priority < 300, "Priority should be reasonable");
4052
4053 assert!(matches!(
4055 candidate.source,
4056 DiscoverySourceType::ServerReflexive
4057 ));
4058 }
4059 }
4060
4061 #[test]
4062 fn test_quic_discovered_address_priority_calculation() {
4063 let mut manager = create_test_manager();
4065 let peer_id = PeerId([1; 32]);
4066
4067 manager.start_discovery(peer_id, vec![]).unwrap();
4069
4070 let test_cases = vec![
4072 ("1.2.3.4:5678", (250, 260), "Public IPv4"),
4074 ("192.168.1.100:9000", (240, 250), "Private IPv4"),
4075 ("[2001:db8::1]:5678", (260, 280), "Global IPv6"),
4076 ("[fe80::1]:5678", (220, 240), "Link-local IPv6"),
4077 ("[fc00::1]:5678", (240, 260), "Unique local IPv6"),
4078 ("10.0.0.1:9000", (240, 250), "Private IPv4 (10.x)"),
4079 ("172.16.0.1:9000", (240, 250), "Private IPv4 (172.16.x)"),
4080 ];
4081
4082 for (addr_str, (min_priority, max_priority), description) in test_cases {
4083 let addr: SocketAddr = addr_str.parse().unwrap();
4084 manager
4085 .accept_quic_discovered_address(peer_id, addr)
4086 .unwrap();
4087
4088 let session = manager.active_sessions.get(&peer_id).unwrap();
4089 let candidate = session
4090 .discovered_candidates
4091 .iter()
4092 .find(|c| c.address == addr)
4093 .unwrap_or_else(|| panic!("No candidate found for {description}"));
4094
4095 assert!(
4096 candidate.priority >= min_priority && candidate.priority <= max_priority,
4097 "{} priority {} not in range [{}, {}]",
4098 description,
4099 candidate.priority,
4100 min_priority,
4101 max_priority
4102 );
4103 }
4104 }
4105
4106 #[test]
4107 fn test_quic_discovered_priority_factors() {
4108 let manager = create_test_manager();
4110
4111 let base_priority =
4113 manager.calculate_quic_discovered_priority(&"1.2.3.4:5678".parse().unwrap());
4114 assert_eq!(
4115 base_priority, 255,
4116 "Base priority should be 255 for public IPv4"
4117 );
4118
4119 let ipv6_priority =
4121 manager.calculate_quic_discovered_priority(&"[2001:db8::1]:5678".parse().unwrap());
4122 assert!(
4123 ipv6_priority > base_priority,
4124 "IPv6 should have higher priority than IPv4"
4125 );
4126
4127 let private_priority =
4129 manager.calculate_quic_discovered_priority(&"192.168.1.1:5678".parse().unwrap());
4130 assert!(
4131 private_priority < base_priority,
4132 "Private addresses should have lower priority"
4133 );
4134
4135 let link_local_priority =
4137 manager.calculate_quic_discovered_priority(&"[fe80::1]:5678".parse().unwrap());
4138 assert!(
4139 link_local_priority < private_priority,
4140 "Link-local should have lower priority than private"
4141 );
4142 }
4143
4144 #[test]
4145 fn test_quic_discovered_addresses_override_stale_server_reflexive() {
4146 let mut manager = create_test_manager();
4148 let peer_id = PeerId([1; 32]);
4149
4150 manager.start_discovery(peer_id, vec![]).unwrap();
4152
4153 let session = manager.active_sessions.get_mut(&peer_id).unwrap();
4155 let old_candidate = DiscoveryCandidate {
4156 address: "1.2.3.4:1234".parse().unwrap(),
4157 priority: 200,
4158 source: DiscoverySourceType::ServerReflexive,
4159 state: CandidateState::Validating,
4160 };
4161 session.discovered_candidates.push(old_candidate);
4162
4163 let new_addr = "1.2.3.4:5678".parse().unwrap();
4165 manager
4166 .accept_quic_discovered_address(peer_id, new_addr)
4167 .unwrap();
4168
4169 let session = manager.active_sessions.get(&peer_id).unwrap();
4171 let candidates: Vec<_> = session
4172 .discovered_candidates
4173 .iter()
4174 .filter(|c| c.source == DiscoverySourceType::ServerReflexive)
4175 .collect();
4176
4177 assert_eq!(
4178 candidates.len(),
4179 2,
4180 "Should have both old and new candidates"
4181 );
4182
4183 let new_candidate = candidates.iter().find(|c| c.address == new_addr).unwrap();
4185 assert_ne!(
4186 new_candidate.priority, 200,
4187 "New candidate should have recalculated priority"
4188 );
4189 }
4190
4191 #[test]
4192 fn test_quic_discovered_address_generates_events() {
4193 let mut manager = create_test_manager();
4195 let peer_id = PeerId([1; 32]);
4196
4197 manager.start_discovery(peer_id, vec![]).unwrap();
4199
4200 manager.poll_discovery_progress(peer_id);
4202
4203 let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4205 manager
4206 .accept_quic_discovered_address(peer_id, discovered_addr)
4207 .unwrap();
4208
4209 let events = manager.poll_discovery_progress(peer_id);
4211
4212 assert!(
4214 !events.is_empty(),
4215 "Should generate events for new QUIC-discovered address"
4216 );
4217
4218 let has_new_candidate = events.iter().any(|e| {
4220 matches!(e,
4221 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4222 if candidate.address == discovered_addr
4223 )
4224 });
4225 assert!(
4226 has_new_candidate,
4227 "Should generate ServerReflexiveCandidateDiscovered event for the discovered address"
4228 );
4229 }
4230
4231 #[test]
4232 fn test_multiple_quic_discovered_addresses_generate_events() {
4233 let mut manager = create_test_manager();
4235 let peer_id = PeerId([1; 32]);
4236
4237 manager.start_discovery(peer_id, vec![]).unwrap();
4239
4240 manager.poll_discovery_progress(peer_id);
4242
4243 let addresses = vec![
4245 "8.8.8.8:5000".parse().unwrap(),
4246 "1.1.1.1:6000".parse().unwrap(),
4247 "[2001:db8::1]:7000".parse().unwrap(),
4248 ];
4249
4250 for addr in &addresses {
4251 manager
4252 .accept_quic_discovered_address(peer_id, *addr)
4253 .unwrap();
4254 }
4255
4256 let events = manager.poll_discovery_progress(peer_id);
4258
4259 for addr in &addresses {
4261 let has_event = events.iter().any(|e| {
4262 matches!(e,
4263 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4264 if candidate.address == *addr
4265 )
4266 });
4267 assert!(has_event, "Should have event for address {addr}");
4268 }
4269 }
4270
4271 #[test]
4272 fn test_duplicate_quic_discovered_address_no_event() {
4273 let mut manager = create_test_manager();
4275 let peer_id = PeerId([1; 32]);
4276
4277 manager.start_discovery(peer_id, vec![]).unwrap();
4279
4280 let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4282 manager
4283 .accept_quic_discovered_address(peer_id, discovered_addr)
4284 .unwrap();
4285
4286 manager.poll_discovery_progress(peer_id);
4288
4289 manager
4291 .accept_quic_discovered_address(peer_id, discovered_addr)
4292 .unwrap();
4293
4294 let events = manager.poll_discovery_progress(peer_id);
4296
4297 let has_duplicate_event = events.iter().any(|e| {
4299 matches!(e,
4300 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4301 if candidate.address == discovered_addr
4302 )
4303 });
4304
4305 assert!(
4306 !has_duplicate_event,
4307 "Should not generate event for duplicate address"
4308 );
4309 }
4310
4311 #[test]
4312 fn test_quic_discovered_address_event_timing() {
4313 let mut manager = create_test_manager();
4315 let peer_id = PeerId([1; 32]);
4316
4317 manager.start_discovery(peer_id, vec![]).unwrap();
4319
4320 manager.poll_discovery_progress(peer_id);
4322
4323 let addr1 = "8.8.8.8:5000".parse().unwrap();
4325 let addr2 = "1.1.1.1:6000".parse().unwrap();
4326
4327 manager
4328 .accept_quic_discovered_address(peer_id, addr1)
4329 .unwrap();
4330 manager
4331 .accept_quic_discovered_address(peer_id, addr2)
4332 .unwrap();
4333
4334 let events = manager.poll_discovery_progress(peer_id);
4337
4338 let server_reflexive_count = events
4340 .iter()
4341 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4342 .count();
4343
4344 assert!(
4345 server_reflexive_count >= 2,
4346 "Should deliver all queued events on poll, got {server_reflexive_count} events"
4347 );
4348
4349 let events2 = manager.poll_discovery_progress(peer_id);
4351 let server_reflexive_count2 = events2
4352 .iter()
4353 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4354 .count();
4355 assert_eq!(
4356 server_reflexive_count2, 0,
4357 "Server reflexive events should not be duplicated on subsequent polls"
4358 );
4359 }
4360
4361 #[test]
4362 fn test_is_valid_local_address() {
4363 let manager = create_test_manager();
4364
4365 assert!(manager.is_valid_local_address(&"192.168.1.1:8080".parse().unwrap()));
4367 assert!(manager.is_valid_local_address(&"10.0.0.1:8080".parse().unwrap()));
4368 assert!(manager.is_valid_local_address(&"172.16.0.1:8080".parse().unwrap()));
4369
4370 assert!(manager.is_valid_local_address(&"[2001:4860:4860::8888]:8080".parse().unwrap()));
4372 assert!(manager.is_valid_local_address(&"[fe80::1]:8080".parse().unwrap())); assert!(manager.is_valid_local_address(&"[fc00::1]:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"0.0.0.0:8080".parse().unwrap()));
4377 assert!(!manager.is_valid_local_address(&"255.255.255.255:8080".parse().unwrap()));
4378 assert!(!manager.is_valid_local_address(&"224.0.0.1:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"0.0.0.1:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"240.0.0.1:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"[::]:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"[ff02::1]:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"[2001:db8::1]:8080".parse().unwrap())); assert!(!manager.is_valid_local_address(&"192.168.1.1:0".parse().unwrap()));
4387
4388 #[cfg(test)]
4390 {
4391 assert!(manager.is_valid_local_address(&"127.0.0.1:8080".parse().unwrap()));
4392 assert!(manager.is_valid_local_address(&"[::1]:8080".parse().unwrap()));
4393 }
4394 }
4395
4396 #[test]
4397 fn test_is_valid_server_reflexive_address() {
4398 let manager = create_test_manager();
4399
4400 assert!(manager.is_valid_server_reflexive_address(&"8.8.8.8:8080".parse().unwrap()));
4402 assert!(manager.is_valid_server_reflexive_address(&"1.1.1.1:53".parse().unwrap()));
4403 assert!(manager.is_valid_server_reflexive_address(&"35.235.1.100:443".parse().unwrap()));
4404
4405 assert!(
4407 manager
4408 .is_valid_server_reflexive_address(&"[2001:4860:4860::8888]:8080".parse().unwrap())
4409 );
4410 assert!(manager.is_valid_server_reflexive_address(
4411 &"[2400:cb00:2048::c629:d7a2]:443".parse().unwrap()
4412 ));
4413
4414 assert!(!manager.is_valid_server_reflexive_address(&"192.168.1.1:8080".parse().unwrap()));
4416 assert!(!manager.is_valid_server_reflexive_address(&"10.0.0.1:8080".parse().unwrap()));
4417 assert!(!manager.is_valid_server_reflexive_address(&"172.16.0.1:8080".parse().unwrap()));
4418
4419 assert!(!manager.is_valid_server_reflexive_address(&"127.0.0.1:8080".parse().unwrap()));
4421 assert!(!manager.is_valid_server_reflexive_address(&"169.254.1.1:8080".parse().unwrap())); assert!(!manager.is_valid_server_reflexive_address(&"0.0.0.0:8080".parse().unwrap()));
4423 assert!(
4424 !manager.is_valid_server_reflexive_address(&"255.255.255.255:8080".parse().unwrap())
4425 );
4426 assert!(!manager.is_valid_server_reflexive_address(&"224.0.0.1:8080".parse().unwrap()));
4427
4428 assert!(!manager.is_valid_server_reflexive_address(&"[::1]:8080".parse().unwrap()));
4430 assert!(!manager.is_valid_server_reflexive_address(&"[fe80::1]:8080".parse().unwrap())); assert!(!manager.is_valid_server_reflexive_address(&"[fc00::1]:8080".parse().unwrap())); assert!(!manager.is_valid_server_reflexive_address(&"[ff02::1]:8080".parse().unwrap())); assert!(!manager.is_valid_server_reflexive_address(&"8.8.8.8:0".parse().unwrap()));
4436 }
4437
4438 #[test]
4441 fn test_validation_rejects_invalid_addresses() {
4442 let manager = create_test_manager();
4443
4444 let invalid_server_reflexive = vec![
4448 "0.0.0.0:8080", "255.255.255.255:8080", "224.0.0.1:8080", "192.168.1.1:0", "127.0.0.1:8080", "10.0.0.1:8080", "[::]:8080", "[fe80::1]:8080", ];
4457
4458 for addr_str in invalid_server_reflexive {
4459 let addr: SocketAddr = addr_str.parse().unwrap();
4460 assert!(
4461 !manager.is_valid_server_reflexive_address(&addr),
4462 "Address {} should be invalid for server reflexive",
4463 addr_str
4464 );
4465 }
4466
4467 let valid_server_reflexive = vec![
4469 "8.8.8.8:8080",
4470 "1.1.1.1:53",
4471 "35.235.1.100:443",
4472 "[2001:4860:4860::8888]:8080",
4473 ];
4474
4475 for addr_str in valid_server_reflexive {
4476 let addr: SocketAddr = addr_str.parse().unwrap();
4477 assert!(
4478 manager.is_valid_server_reflexive_address(&addr),
4479 "Address {} should be valid for server reflexive",
4480 addr_str
4481 );
4482 }
4483 }
4484
4485 #[test]
4486 fn test_candidate_validation_error_types() {
4487 use crate::nat_traversal_api::{CandidateAddress, CandidateValidationError};
4488
4489 assert!(matches!(
4491 CandidateAddress::validate_address(&"192.168.1.1:0".parse().unwrap()),
4492 Err(CandidateValidationError::InvalidPort(0))
4493 ));
4494
4495 assert!(matches!(
4496 CandidateAddress::validate_address(&"0.0.0.0:8080".parse().unwrap()),
4497 Err(CandidateValidationError::UnspecifiedAddress)
4498 ));
4499
4500 assert!(matches!(
4501 CandidateAddress::validate_address(&"255.255.255.255:8080".parse().unwrap()),
4502 Err(CandidateValidationError::BroadcastAddress)
4503 ));
4504
4505 assert!(matches!(
4506 CandidateAddress::validate_address(&"224.0.0.1:8080".parse().unwrap()),
4507 Err(CandidateValidationError::MulticastAddress)
4508 ));
4509
4510 assert!(matches!(
4511 CandidateAddress::validate_address(&"240.0.0.1:8080".parse().unwrap()),
4512 Err(CandidateValidationError::ReservedAddress)
4513 ));
4514
4515 assert!(matches!(
4516 CandidateAddress::validate_address(&"[2001:db8::1]:8080".parse().unwrap()),
4517 Err(CandidateValidationError::DocumentationAddress)
4518 ));
4519
4520 assert!(matches!(
4521 CandidateAddress::validate_address(&"[::ffff:192.168.1.1]:8080".parse().unwrap()),
4522 Err(CandidateValidationError::IPv4MappedAddress)
4523 ));
4524 }
4525}