1#![allow(missing_docs)]
8
9use std::{
18 collections::{HashMap, VecDeque},
19 net::{IpAddr, SocketAddr},
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24use tracing::{debug, error, info, warn};
25
26use crate::Connection;
27
28use crate::{
29 connection::nat_traversal::{CandidateSource, CandidateState},
30 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
31};
32
33#[cfg(all(target_os = "windows", feature = "network-discovery"))]
35pub mod windows;
36
37#[cfg(all(target_os = "windows", feature = "network-discovery"))]
38pub use windows::WindowsInterfaceDiscovery;
39
40#[cfg(all(target_os = "linux", feature = "network-discovery"))]
41pub mod linux;
42
43#[cfg(all(target_os = "linux", feature = "network-discovery"))]
44pub use linux::LinuxInterfaceDiscovery;
45
46#[cfg(all(target_os = "macos", feature = "network-discovery"))]
47pub mod macos;
48
49#[cfg(all(target_os = "macos", feature = "network-discovery"))]
50pub use macos::MacOSInterfaceDiscovery;
51
52fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
54 match discovery_source {
55 DiscoverySourceType::Local => CandidateSource::Local,
56 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
57 DiscoverySourceType::Predicted => CandidateSource::Predicted,
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum DiscoverySourceType {
67 Local,
72
73 ServerReflexive,
78
79 Predicted,
84}
85
86#[derive(Debug, Clone)]
88pub(crate) struct DiscoveryCandidate {
89 pub address: SocketAddr,
90 pub priority: u32,
91 pub source: DiscoverySourceType,
92 pub state: CandidateState,
93}
94
95impl DiscoveryCandidate {
96 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
98 CandidateAddress {
99 address: self.address,
100 priority: self.priority,
101 source: convert_to_nat_source(self.source),
102 state: self.state,
103 }
104 }
105}
106
107#[derive(Debug)]
109
110pub struct DiscoverySession {
111 #[allow(dead_code)]
113 peer_id: PeerId,
114 #[allow(dead_code)]
116 session_id: u64,
117 current_phase: DiscoveryPhase,
119 started_at: Instant,
121 discovered_candidates: Vec<DiscoveryCandidate>,
123 statistics: DiscoveryStatistics,
125 #[allow(dead_code)]
127 allocation_history: VecDeque<PortAllocationEvent>,
128}
129
130pub struct CandidateDiscoveryManager {
132 config: DiscoveryConfig,
134 interface_discovery: Arc<std::sync::Mutex<Box<dyn NetworkInterfaceDiscovery + Send>>>,
136 #[allow(dead_code)]
140 cache: DiscoveryCache,
141 active_sessions: HashMap<PeerId, DiscoverySession>,
143 cached_local_candidates: Option<(Instant, Vec<ValidatedCandidate>)>,
145 #[allow(dead_code)]
147 local_cache_duration: Duration,
148 pending_validations: HashMap<CandidateId, PendingValidation>,
150}
151
152#[derive(Debug, Clone)]
154pub struct DiscoveryConfig {
155 pub total_timeout: Duration,
157 pub local_scan_timeout: Duration,
159 pub bootstrap_query_timeout: Duration,
161 pub max_query_retries: u32,
163 pub max_candidates: usize,
165 pub enable_symmetric_prediction: bool,
167 pub min_bootstrap_consensus: usize,
169 pub interface_cache_ttl: Duration,
171 pub server_reflexive_cache_ttl: Duration,
173 pub bound_address: Option<SocketAddr>,
175}
176
177#[derive(Debug, Clone, PartialEq)]
179#[allow(missing_docs)]
180pub enum DiscoveryPhase {
181 Idle,
183 LocalInterfaceScanning { started_at: Instant },
185 ServerReflexiveQuerying {
187 started_at: Instant,
188 active_queries: HashMap<BootstrapNodeId, QueryState>,
189 responses_received: Vec<ServerReflexiveResponse>,
190 },
191 CandidateValidation {
194 started_at: Instant,
195 validation_results: HashMap<CandidateId, ValidationResult>,
196 },
197 Completed {
199 final_candidates: Vec<ValidatedCandidate>,
200 completion_time: Instant,
201 },
202 Failed {
204 error: DiscoveryError,
206 failed_at: Instant,
208 fallback_options: Vec<FallbackStrategy>,
210 },
211}
212
213#[derive(Debug, Clone)]
215pub enum DiscoveryEvent {
216 DiscoveryStarted {
218 peer_id: PeerId,
219 bootstrap_count: usize,
220 },
221 LocalScanningStarted,
223 LocalCandidateDiscovered { candidate: CandidateAddress },
225 LocalScanningCompleted {
227 candidate_count: usize,
228 duration: Duration,
229 },
230 ServerReflexiveDiscoveryStarted { bootstrap_count: usize },
232 ServerReflexiveCandidateDiscovered {
234 candidate: CandidateAddress,
235 bootstrap_node: SocketAddr,
236 },
237 BootstrapQueryFailed {
239 bootstrap_node: SocketAddr,
241 error: String,
243 },
244 PortAllocationDetected {
247 port: u16,
248 source_address: SocketAddr,
249 bootstrap_node: BootstrapNodeId,
250 timestamp: Instant,
251 },
252 DiscoveryCompleted {
254 candidate_count: usize,
255 total_duration: Duration,
256 success_rate: f64,
257 },
258 DiscoveryFailed {
260 error: DiscoveryError,
262 partial_results: Vec<CandidateAddress>,
264 },
265 PathValidationRequested {
267 candidate_id: CandidateId,
268 candidate_address: SocketAddr,
269 challenge_token: u64,
270 },
271 PathValidationResponse {
273 candidate_id: CandidateId,
274 candidate_address: SocketAddr,
275 challenge_token: u64,
276 rtt: Duration,
277 },
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
282pub struct BootstrapNodeId(pub u64);
283
284struct PendingValidation {
286 candidate_address: SocketAddr,
288 challenge_token: u64,
290 started_at: Instant,
292 #[allow(dead_code)]
294 attempts: u32,
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
299pub enum QueryState {
300 Pending { sent_at: Instant, attempts: u32 },
302 Completed,
304 Failed,
306}
307
308#[derive(Debug, Clone, PartialEq)]
310pub struct ServerReflexiveResponse {
311 pub bootstrap_node: BootstrapNodeId,
312 pub observed_address: SocketAddr,
313 pub response_time: Duration,
314 pub timestamp: Instant,
315}
316
317#[derive(Debug, Clone, PartialEq)]
321pub struct PortAllocationEvent {
322 pub port: u16,
323 pub timestamp: Instant,
324 pub source_address: SocketAddr,
325}
326
327#[derive(Debug, Clone, PartialEq)]
329pub struct PortAllocationPattern {
330 pub pattern_type: AllocationPatternType,
331 pub base_port: u16,
332 pub stride: u16,
333 pub pool_boundaries: Option<(u16, u16)>,
334 pub confidence: f64,
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
339pub enum AllocationPatternType {
340 Sequential,
342 FixedStride,
344 Random,
346 PoolBased,
348 TimeBased,
350 Unknown,
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
358pub struct CandidateId(pub u64);
359
360#[derive(Debug, Clone, PartialEq)]
362pub enum ValidationResult {
363 Valid { rtt: Duration },
364 Invalid { reason: String },
365 Timeout,
366 Pending,
367}
368
369#[derive(Debug, Clone, PartialEq)]
371pub struct ValidatedCandidate {
372 pub id: CandidateId,
373 pub address: SocketAddr,
374 pub source: DiscoverySourceType,
375 pub priority: u32,
376 pub rtt: Option<Duration>,
377 pub reliability_score: f64,
378}
379
380impl ValidatedCandidate {
381 pub fn to_candidate_address(&self) -> CandidateAddress {
383 CandidateAddress {
384 address: self.address,
385 priority: self.priority,
386 source: convert_to_nat_source(self.source),
387 state: CandidateState::Valid,
388 }
389 }
390}
391
392#[derive(Debug)]
394#[allow(dead_code)]
395pub(crate) struct DiscoverySessionState {
396 pub peer_id: PeerId,
397 pub session_id: u64,
398 pub started_at: Instant,
399 pub discovered_candidates: Vec<DiscoveryCandidate>,
400 pub statistics: DiscoveryStatistics,
401 pub allocation_history: VecDeque<PortAllocationEvent>,
402}
403
404#[derive(Debug, Default, Clone)]
406pub struct DiscoveryStatistics {
407 pub local_candidates_found: u32,
408 pub server_reflexive_candidates_found: u32,
409 pub predicted_candidates_generated: u32,
410 pub bootstrap_queries_sent: u32,
411 pub bootstrap_queries_successful: u32,
412 pub total_discovery_time: Option<Duration>,
413 pub average_bootstrap_rtt: Option<Duration>,
414 pub invalid_addresses_rejected: u32,
415}
416
417#[derive(Debug, Clone, PartialEq, Eq)]
422pub enum DiscoveryError {
423 NoLocalInterfaces,
428
429 AllBootstrapsFailed,
434
435 DiscoveryTimeout,
440
441 InsufficientCandidates {
446 found: usize,
448 required: usize,
450 },
451
452 NetworkError(String),
457 ConfigurationError(String),
459 InternalError(String),
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
465pub enum FallbackStrategy {
466 UseCachedResults,
468 RetryWithRelaxedParams,
470 UseMinimalCandidates,
472 EnableRelayFallback,
474}
475
476impl Default for DiscoveryConfig {
477 fn default() -> Self {
478 Self {
479 total_timeout: Duration::from_secs(30),
480 local_scan_timeout: Duration::from_secs(2),
481 bootstrap_query_timeout: Duration::from_secs(5),
482 max_query_retries: 3,
483 max_candidates: 8,
484 enable_symmetric_prediction: true,
485 min_bootstrap_consensus: 2,
486 interface_cache_ttl: Duration::from_secs(60),
487 server_reflexive_cache_ttl: Duration::from_secs(300),
488 bound_address: None,
489 }
490 }
491}
492
493impl DiscoverySession {
494 fn new(peer_id: PeerId, _config: &DiscoveryConfig) -> Self {
496 Self {
497 peer_id,
498 session_id: rand::random(),
499 current_phase: DiscoveryPhase::Idle,
500 started_at: Instant::now(),
501 discovered_candidates: Vec::new(),
502 statistics: DiscoveryStatistics::default(),
503 allocation_history: VecDeque::new(),
504 }
505 }
506}
507
508impl CandidateDiscoveryManager {
509 pub fn new(config: DiscoveryConfig) -> Self {
511 let interface_discovery =
512 Arc::new(std::sync::Mutex::new(create_platform_interface_discovery()));
513 let cache = DiscoveryCache::new(&config);
514 let local_cache_duration = config.interface_cache_ttl;
515
516 Self {
517 config,
518 interface_discovery,
519 cache,
520 active_sessions: HashMap::new(),
521 cached_local_candidates: None,
522 local_cache_duration,
523 pending_validations: HashMap::new(),
524 }
525 }
526
527 pub fn set_bound_address(&mut self, address: SocketAddr) {
529 self.config.bound_address = Some(address);
530 self.cached_local_candidates = None;
532 }
533
534 #[allow(clippy::panic)]
536 pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
537 let mut interface_discovery = self
539 .interface_discovery
540 .lock()
541 .map_err(|e| DiscoveryError::NetworkError(format!("Mutex poisoned: {e}")))?;
542 interface_discovery.start_scan().map_err(|e| {
543 DiscoveryError::NetworkError(format!("Failed to start interface scan: {e}"))
544 })?;
545
546 let start = Instant::now();
548 let timeout = Duration::from_secs(2);
549
550 loop {
551 if start.elapsed() > timeout {
552 return Err(DiscoveryError::DiscoveryTimeout);
553 }
554
555 if let Some(interfaces) = self
556 .interface_discovery
557 .lock()
558 .unwrap_or_else(|_| panic!("interface discovery mutex should be valid"))
559 .check_scan_complete()
560 {
561 let mut candidates = Vec::new();
563
564 for interface in interfaces {
565 for addr in interface.addresses {
566 candidates.push(ValidatedCandidate {
567 id: CandidateId(rand::random()),
568 address: addr,
569 source: DiscoverySourceType::Local,
570 priority: 50000, rtt: None,
572 reliability_score: 1.0,
573 });
574 }
575 }
576
577 if candidates.is_empty() {
578 return Err(DiscoveryError::NoLocalInterfaces);
579 }
580
581 return Ok(candidates);
582 }
583
584 std::thread::sleep(Duration::from_millis(10));
586 }
587 }
588
589 pub fn start_discovery(
591 &mut self,
592 peer_id: PeerId,
593 _bootstrap_nodes: Vec<BootstrapNode>,
594 ) -> Result<(), DiscoveryError> {
595 if self.active_sessions.contains_key(&peer_id) {
597 return Err(DiscoveryError::InternalError(format!(
598 "Discovery already in progress for peer {peer_id:?}"
599 )));
600 }
601
602 info!("Starting candidate discovery for peer {:?}", peer_id);
603
604 let mut session = DiscoverySession::new(peer_id, &self.config);
606
607 session.current_phase = DiscoveryPhase::LocalInterfaceScanning {
609 started_at: Instant::now(),
610 };
611
612 self.active_sessions.insert(peer_id, session);
614
615 Ok(())
616 }
617
618 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
620 let mut all_events = Vec::new();
621 let mut completed_sessions = Vec::new();
622
623 let mut local_scan_events = Vec::new();
626 for (peer_id, session) in &mut self.active_sessions {
627 if let DiscoveryPhase::LocalInterfaceScanning { started_at } = &session.current_phase {
628 if started_at.elapsed() > self.config.local_scan_timeout {
630 local_scan_events.push((
631 *peer_id,
632 DiscoveryEvent::LocalScanningCompleted {
633 candidate_count: 0,
634 duration: started_at.elapsed(),
635 },
636 ));
637 }
638 }
639 }
640
641 for (peer_id, event) in local_scan_events {
643 all_events.push(event);
644 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
645 session.current_phase = DiscoveryPhase::Completed {
647 final_candidates: session
648 .discovered_candidates
649 .iter()
650 .map(|dc| ValidatedCandidate {
651 id: CandidateId(0),
652 address: dc.address,
653 source: dc.source,
654 priority: dc.priority,
655 rtt: None,
656 reliability_score: 1.0,
657 })
658 .collect(),
659 completion_time: now,
660 };
661
662 all_events.push(DiscoveryEvent::DiscoveryCompleted {
663 candidate_count: session.discovered_candidates.len(),
664 total_duration: now.duration_since(session.started_at),
665 success_rate: 1.0,
666 });
667
668 completed_sessions.push(peer_id);
669 }
670 }
671
672 for peer_id in completed_sessions {
674 self.active_sessions.remove(&peer_id);
675 debug!("Removed completed discovery session for peer {:?}", peer_id);
676 }
677
678 all_events
679 }
680
681 pub fn get_status(&self) -> DiscoveryStatus {
683 DiscoveryStatus {
685 phase: DiscoveryPhase::Idle,
686 discovered_candidates: Vec::new(),
687 statistics: DiscoveryStatistics::default(),
688 elapsed_time: Duration::from_secs(0),
689 }
690 }
691
692 pub fn is_complete(&self) -> bool {
694 self.active_sessions.values().all(|session| {
696 matches!(
697 session.current_phase,
698 DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. }
699 )
700 })
701 }
702
703 pub fn get_results(&self) -> Option<DiscoveryResults> {
705 if self.active_sessions.is_empty() {
707 return None;
708 }
709
710 let mut all_candidates = Vec::new();
712 let mut latest_completion = Instant::now();
713 let mut combined_stats = DiscoveryStatistics::default();
714
715 for session in self.active_sessions.values() {
716 match &session.current_phase {
717 DiscoveryPhase::Completed {
718 final_candidates,
719 completion_time,
720 } => {
721 all_candidates.extend(final_candidates.clone());
723 latest_completion = *completion_time;
724 combined_stats.local_candidates_found +=
726 session.statistics.local_candidates_found;
727 combined_stats.server_reflexive_candidates_found +=
728 session.statistics.server_reflexive_candidates_found;
729 combined_stats.predicted_candidates_generated +=
730 session.statistics.predicted_candidates_generated;
731 combined_stats.bootstrap_queries_sent +=
732 session.statistics.bootstrap_queries_sent;
733 combined_stats.bootstrap_queries_successful +=
734 session.statistics.bootstrap_queries_successful;
735 }
736 DiscoveryPhase::Failed { .. } => {
737 let validated: Vec<ValidatedCandidate> = session
740 .discovered_candidates
741 .iter()
742 .enumerate()
743 .map(|(idx, dc)| ValidatedCandidate {
744 id: CandidateId(idx as u64),
745 address: dc.address,
746 source: dc.source,
747 priority: dc.priority,
748 rtt: None,
749 reliability_score: 0.5, })
751 .collect();
752 all_candidates.extend(validated);
753 }
754 _ => {}
755 }
756 }
757
758 if all_candidates.is_empty() {
759 None
760 } else {
761 Some(DiscoveryResults {
762 candidates: all_candidates,
763 completion_time: latest_completion,
764 statistics: combined_stats,
765 })
766 }
767 }
768
769 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
771 if let Some(session) = self.active_sessions.get(&peer_id) {
773 session
775 .discovered_candidates
776 .iter()
777 .map(|c| c.to_candidate_address())
778 .collect()
779 } else {
780 debug!("No active discovery session found for peer {:?}", peer_id);
782 Vec::new()
783 }
784 }
785
786 #[allow(dead_code)]
789 fn poll_session_local_scanning(
790 &mut self,
791 session: &mut DiscoverySession,
792 started_at: Instant,
793 now: Instant,
794 events: &mut Vec<DiscoveryEvent>,
795 ) {
796 if let Some((cache_time, ref cached_candidates)) = self.cached_local_candidates {
798 if cache_time.elapsed() < self.local_cache_duration {
799 debug!(
801 "Using cached local candidates for peer {:?}",
802 session.peer_id
803 );
804 self.process_cached_local_candidates(
805 session,
806 cached_candidates.clone(),
807 events,
808 now,
809 );
810 return;
811 }
812 }
813
814 if started_at.elapsed().as_millis() < 10 {
817 let scan_result = match self.interface_discovery.lock() {
818 Ok(mut interface_discovery) => interface_discovery.start_scan(),
819 Err(e) => {
820 error!("Interface discovery mutex poisoned: {}", e);
821 return;
822 }
823 };
824 match scan_result {
825 Ok(()) => {
826 debug!(
827 "Started local interface scan for peer {:?}",
828 session.peer_id
829 );
830 events.push(DiscoveryEvent::LocalScanningStarted);
831 }
832 Err(e) => {
833 error!("Failed to start interface scan: {}", e);
834 return;
835 }
836 }
837 }
838
839 if started_at.elapsed() > self.config.local_scan_timeout {
841 warn!(
842 "Local interface scanning timeout for peer {:?}",
843 session.peer_id
844 );
845 self.handle_session_local_scan_timeout(session, events, now);
846 return;
847 }
848
849 let scan_complete_result = match self.interface_discovery.lock() {
851 Ok(mut interface_discovery) => interface_discovery.check_scan_complete(),
852 Err(e) => {
853 error!("Interface discovery mutex poisoned: {}", e);
854 return;
855 }
856 };
857 if let Some(interfaces) = scan_complete_result {
858 self.process_session_local_interfaces(session, interfaces, events, now);
859 }
860 }
861
862 #[allow(dead_code)]
863 fn process_session_local_interfaces(
864 &mut self,
865 session: &mut DiscoverySession,
866 interfaces: Vec<NetworkInterface>,
867 events: &mut Vec<DiscoveryEvent>,
868 now: Instant,
869 ) {
870 debug!(
871 "Processing {} network interfaces for peer {:?}",
872 interfaces.len(),
873 session.peer_id
874 );
875
876 let mut validated_candidates = Vec::new();
877
878 if let Some(bound_addr) = self.config.bound_address {
880 if self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback() {
881 let candidate = DiscoveryCandidate {
882 address: bound_addr,
883 priority: 60000, source: DiscoverySourceType::Local,
885 state: CandidateState::New,
886 };
887
888 session.discovered_candidates.push(candidate.clone());
889 session.statistics.local_candidates_found += 1;
890
891 validated_candidates.push(ValidatedCandidate {
893 id: CandidateId(rand::random()),
894 address: bound_addr,
895 source: DiscoverySourceType::Local,
896 priority: candidate.priority,
897 rtt: None,
898 reliability_score: 1.0,
899 });
900
901 events.push(DiscoveryEvent::LocalCandidateDiscovered {
902 candidate: candidate.to_candidate_address(),
903 });
904
905 debug!(
906 "Added bound address {} as local candidate for peer {:?}",
907 bound_addr, session.peer_id
908 );
909 }
910 }
911
912 for interface in &interfaces {
914 for address in &interface.addresses {
915 if Some(*address) == self.config.bound_address {
917 continue;
918 }
919
920 if self.is_valid_local_address(address) {
921 let candidate = DiscoveryCandidate {
922 address: *address,
923 priority: self.calculate_local_priority(address, interface),
924 source: DiscoverySourceType::Local,
925 state: CandidateState::New,
926 };
927
928 session.discovered_candidates.push(candidate.clone());
929 session.statistics.local_candidates_found += 1;
930
931 validated_candidates.push(ValidatedCandidate {
933 id: CandidateId(rand::random()),
934 address: *address,
935 source: DiscoverySourceType::Local,
936 priority: candidate.priority,
937 rtt: None,
938 reliability_score: 1.0,
939 });
940
941 events.push(DiscoveryEvent::LocalCandidateDiscovered {
942 candidate: candidate.to_candidate_address(),
943 });
944 }
945 }
946 }
947
948 self.cached_local_candidates = Some((now, validated_candidates));
950
951 events.push(DiscoveryEvent::LocalScanningCompleted {
952 candidate_count: session.statistics.local_candidates_found as usize,
953 duration: now.duration_since(session.started_at),
954 });
955
956 self.complete_session_discovery_with_local_candidates(session, events, now);
958 }
959
960 #[allow(dead_code)]
961 fn process_cached_local_candidates(
962 &mut self,
963 session: &mut DiscoverySession,
964 mut cached_candidates: Vec<ValidatedCandidate>,
965 events: &mut Vec<DiscoveryEvent>,
966 now: Instant,
967 ) {
968 if let Some(bound_addr) = self.config.bound_address {
970 let has_bound_addr = cached_candidates.iter().any(|c| c.address == bound_addr);
971 if !has_bound_addr
972 && (self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback())
973 {
974 cached_candidates.insert(
975 0,
976 ValidatedCandidate {
977 id: CandidateId(rand::random()),
978 address: bound_addr,
979 source: DiscoverySourceType::Local,
980 priority: 60000, rtt: None,
982 reliability_score: 1.0,
983 },
984 );
985 }
986 }
987
988 debug!(
989 "Using {} cached local candidates for peer {:?}",
990 cached_candidates.len(),
991 session.peer_id
992 );
993
994 for validated in cached_candidates {
995 let candidate = DiscoveryCandidate {
996 address: validated.address,
997 priority: validated.priority,
998 source: validated.source,
999 state: CandidateState::New,
1000 };
1001
1002 session.discovered_candidates.push(candidate.clone());
1003 session.statistics.local_candidates_found += 1;
1004
1005 events.push(DiscoveryEvent::LocalCandidateDiscovered {
1006 candidate: candidate.to_candidate_address(),
1007 });
1008 }
1009
1010 events.push(DiscoveryEvent::LocalScanningCompleted {
1011 candidate_count: session.statistics.local_candidates_found as usize,
1012 duration: now.duration_since(session.started_at),
1013 });
1014
1015 self.complete_session_discovery_with_local_candidates(session, events, now);
1017 }
1018
1019 #[allow(dead_code)]
1026 fn start_session_candidate_validation(
1027 &mut self,
1028 session: &mut DiscoverySession,
1029 _events: &mut Vec<DiscoveryEvent>,
1030 now: Instant,
1031 ) {
1032 debug!(
1033 "Starting candidate validation for {} candidates",
1034 session.discovered_candidates.len()
1035 );
1036
1037 session.current_phase = DiscoveryPhase::CandidateValidation {
1038 started_at: now,
1039 validation_results: HashMap::new(),
1040 };
1041 }
1042
1043 #[allow(dead_code)]
1045 fn start_path_validation(
1046 &mut self,
1047 candidate_id: CandidateId,
1048 candidate_address: SocketAddr,
1049 now: Instant,
1050 events: &mut Vec<DiscoveryEvent>,
1051 ) {
1052 debug!(
1053 "Starting QUIC path validation for candidate {} at {}",
1054 candidate_id.0, candidate_address
1055 );
1056
1057 let challenge_token: u64 = rand::random();
1059
1060 self.pending_validations.insert(
1062 candidate_id,
1063 PendingValidation {
1064 candidate_address,
1065 challenge_token,
1066 started_at: now,
1067 attempts: 1,
1068 },
1069 );
1070
1071 events.push(DiscoveryEvent::PathValidationRequested {
1073 candidate_id,
1074 candidate_address,
1075 challenge_token,
1076 });
1077
1078 debug!(
1079 "PATH_CHALLENGE {:08x} requested for candidate {} at {}",
1080 challenge_token, candidate_id.0, candidate_address
1081 );
1082 }
1083
1084 pub fn handle_path_response(
1086 &mut self,
1087 candidate_address: SocketAddr,
1088 challenge_token: u64,
1089 now: Instant,
1090 ) -> Option<DiscoveryEvent> {
1091 let candidate_id = self
1093 .pending_validations
1094 .iter()
1095 .find(|(_, validation)| {
1096 validation.candidate_address == candidate_address
1097 && validation.challenge_token == challenge_token
1098 })
1099 .map(|(id, _)| *id)?;
1100
1101 let validation = self.pending_validations.remove(&candidate_id)?;
1103 let rtt = now.duration_since(validation.started_at);
1104
1105 debug!(
1106 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1107 candidate_id.0, candidate_address, rtt
1108 );
1109
1110 for session in self.active_sessions.values_mut() {
1112 if let Some(candidate) = session
1113 .discovered_candidates
1114 .iter_mut()
1115 .find(|c| c.address == candidate_address)
1116 {
1117 candidate.state = CandidateState::Valid;
1118 break;
1120 }
1121 }
1122
1123 Some(DiscoveryEvent::PathValidationResponse {
1124 candidate_id,
1125 candidate_address,
1126 challenge_token,
1127 rtt,
1128 })
1129 }
1130
1131 #[allow(dead_code)]
1133 fn simulate_path_validation(
1134 &mut self,
1135 candidate_id: CandidateId,
1136 candidate_address: SocketAddr,
1137 _now: Instant,
1138 ) {
1139 let is_local = candidate_address.ip().is_loopback()
1141 || (candidate_address.ip().is_ipv4()
1142 && candidate_address.ip().to_string().starts_with("192.168."))
1143 || (candidate_address.ip().is_ipv4()
1144 && candidate_address.ip().to_string().starts_with("10."))
1145 || (candidate_address.ip().is_ipv4()
1146 && candidate_address.ip().to_string().starts_with("172."));
1147
1148 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
1149
1150 debug!(
1153 "Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
1154 candidate_id.0, candidate_address, is_local, is_server_reflexive
1155 );
1156 }
1157
1158 #[allow(dead_code)]
1160 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
1161 let is_local = address.ip().is_loopback()
1162 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168."))
1163 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("10."))
1164 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
1165
1166 if is_local {
1167 ValidationResult::Valid {
1169 rtt: Duration::from_millis(1),
1170 }
1171 } else if address.ip().is_unspecified() {
1172 ValidationResult::Invalid {
1174 reason: "Unspecified address".to_string(),
1175 }
1176 } else {
1177 ValidationResult::Valid {
1179 rtt: Duration::from_millis(50 + (address.port() % 100) as u64),
1180 }
1181 }
1182 }
1183
1184 #[allow(dead_code)]
1186 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1187 let mut score: f64 = 0.5; match candidate.source {
1191 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
1195
1196 let rtt_ms = rtt.as_millis() as f64;
1198 if rtt_ms < 10.0 {
1199 score += 0.2;
1200 } else if rtt_ms < 50.0 {
1201 score += 0.1;
1202 } else if rtt_ms > 200.0 {
1203 score -= 0.1;
1204 }
1205
1206 if candidate.address.ip().is_ipv6() {
1208 score += 0.05; }
1210
1211 score.max(0.0).min(1.0)
1213 }
1214
1215 #[allow(dead_code)]
1218 fn handle_session_timeout(
1219 &mut self,
1220 session: &mut DiscoverySession,
1221 events: &mut Vec<DiscoveryEvent>,
1222 now: Instant,
1223 ) {
1224 let error = DiscoveryError::DiscoveryTimeout;
1225 let partial_results = session
1226 .discovered_candidates
1227 .iter()
1228 .map(|c| c.to_candidate_address())
1229 .collect();
1230
1231 warn!(
1232 "Discovery failed for peer {:?}: discovery process timed out (found {} partial candidates)",
1233 session.peer_id,
1234 session.discovered_candidates.len()
1235 );
1236 events.push(DiscoveryEvent::DiscoveryFailed {
1237 error: error.clone(),
1238 partial_results,
1239 });
1240
1241 session.current_phase = DiscoveryPhase::Failed {
1242 error,
1243 failed_at: now,
1244 fallback_options: vec![FallbackStrategy::UseCachedResults],
1245 };
1246 }
1247
1248 #[allow(dead_code)]
1249 fn handle_session_local_scan_timeout(
1250 &mut self,
1251 session: &mut DiscoverySession,
1252 events: &mut Vec<DiscoveryEvent>,
1253 now: Instant,
1254 ) {
1255 warn!(
1256 "Local interface scan timeout for peer {:?}, proceeding with available candidates",
1257 session.peer_id
1258 );
1259
1260 events.push(DiscoveryEvent::LocalScanningCompleted {
1261 candidate_count: session.statistics.local_candidates_found as usize,
1262 duration: now.duration_since(session.started_at),
1263 });
1264
1265 self.complete_session_discovery_with_local_candidates(session, events, now);
1267 }
1268
1269 #[allow(dead_code)]
1274 fn poll_session_candidate_validation(
1275 &mut self,
1276 session: &mut DiscoverySession,
1277 _started_at: Instant,
1278 _validation_results: &HashMap<CandidateId, ValidationResult>,
1279 now: Instant,
1280 events: &mut Vec<DiscoveryEvent>,
1281 ) {
1282 self.complete_session_discovery_with_local_candidates(session, events, now);
1285 }
1286
1287 #[allow(dead_code)]
1288 fn complete_session_discovery_with_local_candidates(
1289 &mut self,
1290 session: &mut DiscoverySession,
1291 events: &mut Vec<DiscoveryEvent>,
1292 now: Instant,
1293 ) {
1294 let duration = now.duration_since(session.started_at);
1296 session.statistics.total_discovery_time = Some(duration);
1297
1298 let success_rate = if session.statistics.local_candidates_found > 0 {
1299 1.0
1300 } else {
1301 0.0
1302 };
1303
1304 let validated_candidates: Vec<ValidatedCandidate> = session
1306 .discovered_candidates
1307 .iter()
1308 .map(|dc| ValidatedCandidate {
1309 id: CandidateId(rand::random()),
1310 address: dc.address,
1311 source: dc.source,
1312 priority: dc.priority,
1313 rtt: None,
1314 reliability_score: 1.0,
1315 })
1316 .collect();
1317
1318 events.push(DiscoveryEvent::DiscoveryCompleted {
1319 candidate_count: validated_candidates.len(),
1320 total_duration: duration,
1321 success_rate,
1322 });
1323
1324 session.current_phase = DiscoveryPhase::Completed {
1325 final_candidates: validated_candidates,
1326 completion_time: now,
1327 };
1328
1329 info!(
1330 "Discovery completed with {} local candidates for peer {:?}",
1331 session.discovered_candidates.len(),
1332 session.peer_id
1333 );
1334 }
1335
1336 #[allow(dead_code)]
1337 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1338 use crate::nat_traversal_api::CandidateAddress;
1340
1341 if let Err(e) = CandidateAddress::validate_address(address) {
1342 debug!("Address {} failed validation: {}", address, e);
1343 return false;
1344 }
1345
1346 match address.ip() {
1347 IpAddr::V4(ipv4) => {
1348 #[cfg(test)]
1350 if ipv4.is_loopback() {
1351 return true;
1352 }
1353 !ipv4.is_loopback()
1356 && !ipv4.is_unspecified()
1357 && !ipv4.is_broadcast()
1358 && !ipv4.is_multicast()
1359 && !ipv4.is_documentation()
1360 }
1361 IpAddr::V6(ipv6) => {
1362 #[cfg(test)]
1364 if ipv6.is_loopback() {
1365 return true;
1366 }
1367 let segments = ipv6.segments();
1369 let is_documentation = segments[0] == 0x2001 && segments[1] == 0x0db8;
1370
1371 !ipv6.is_loopback()
1372 && !ipv6.is_unspecified()
1373 && !ipv6.is_multicast()
1374 && !is_documentation
1375 }
1376 }
1377 }
1378
1379 #[allow(dead_code)]
1382 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1383 let mut priority = 100; match address.ip() {
1386 IpAddr::V4(ipv4) => {
1387 if ipv4.is_private() {
1388 priority += 50; }
1390 }
1391 IpAddr::V6(ipv6) => {
1392 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1395 let segments = ipv6.segments();
1396 if segments[0] & 0xE000 == 0x2000 {
1397 priority += 60;
1399 } else if segments[0] & 0xFFC0 == 0xFE80 {
1400 priority += 20;
1402 } else if segments[0] & 0xFE00 == 0xFC00 {
1403 priority += 40;
1405 } else {
1406 priority += 30;
1408 }
1409 }
1410
1411 priority += 10; }
1414 }
1415
1416 if interface.is_wireless {
1417 priority -= 10; }
1419
1420 priority
1421 }
1422
1423 #[allow(dead_code)]
1424 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1425 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1429 priority += 20;
1430 } else if response.response_time > Duration::from_millis(200) {
1431 priority -= 10;
1432 }
1433
1434 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 {
1436 20
1437 } else {
1438 0
1439 };
1440 priority += age_bonus;
1441
1442 priority
1443 }
1444
1445 #[allow(dead_code)]
1446 fn should_transition_to_prediction(
1447 &self,
1448 responses: &[ServerReflexiveResponse],
1449 _now: Instant,
1450 ) -> bool {
1451 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1452 }
1453
1454 #[allow(dead_code)]
1455 #[allow(clippy::panic)]
1456 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1457 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1459
1460 for response in responses {
1461 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1462 }
1463
1464 address_counts
1465 .into_iter()
1466 .max_by_key(|(_, count)| *count)
1467 .map(|(addr, _)| addr)
1468 .unwrap_or_else(|| {
1469 "0.0.0.0:0"
1470 .parse()
1471 .unwrap_or_else(|_| panic!("hardcoded fallback address should be valid"))
1472 })
1473 }
1474
1475 #[allow(dead_code)]
1477 fn calculate_prediction_accuracy(
1478 &self,
1479 pattern: &PortAllocationPattern,
1480 history: &VecDeque<PortAllocationEvent>,
1481 ) -> f64 {
1482 if history.len() < 3 {
1483 return 0.3; }
1485
1486 let recent_ports: Vec<u16> = history
1488 .iter()
1489 .rev()
1490 .take(10)
1491 .map(|event| event.port)
1492 .collect();
1493
1494 let mut correct_predictions = 0;
1495 let total_predictions = recent_ports.len().saturating_sub(1);
1496
1497 if total_predictions == 0 {
1498 return 0.3;
1499 }
1500
1501 match pattern.pattern_type {
1502 AllocationPatternType::Sequential => {
1503 for i in 1..recent_ports.len() {
1505 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == 1 {
1506 correct_predictions += 1;
1507 }
1508 }
1509 }
1510 AllocationPatternType::FixedStride => {
1511 for i in 1..recent_ports.len() {
1513 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == pattern.stride {
1514 correct_predictions += 1;
1515 }
1516 }
1517 }
1518 AllocationPatternType::PoolBased => {
1519 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1521 for port in &recent_ports {
1522 if *port >= min_port && *port <= max_port {
1523 correct_predictions += 1;
1524 }
1525 }
1526 }
1527 }
1528 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1529 if recent_ports.len() >= 3 {
1531 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>()
1532 / recent_ports.len() as f64;
1533 let variance = recent_ports
1534 .iter()
1535 .map(|&p| (p as f64 - mean).powi(2))
1536 .sum::<f64>()
1537 / recent_ports.len() as f64;
1538
1539 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1543 }
1544 AllocationPatternType::TimeBased => {
1545 if history.len() >= 2 {
1547 let time_diffs: Vec<Duration> = history
1548 .iter()
1549 .collect::<Vec<_>>()
1550 .windows(2)
1551 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1552 .collect();
1553
1554 if !time_diffs.is_empty() {
1555 let avg_diff =
1556 time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1557 let variance = time_diffs
1558 .iter()
1559 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1560 .sum::<f64>()
1561 / time_diffs.len() as f64;
1562
1563 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1567 }
1568 }
1569 }
1570
1571 let accuracy = if total_predictions > 0 {
1573 correct_predictions as f64 / total_predictions as f64
1574 } else {
1575 0.3
1576 };
1577
1578 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1580
1581 confidence_adjusted_accuracy.max(0.2).min(0.9)
1583 }
1584
1585 pub fn accept_quic_discovered_address(
1588 &mut self,
1589 peer_id: PeerId,
1590 discovered_address: SocketAddr,
1591 ) -> Result<bool, DiscoveryError> {
1592 let priority = self.calculate_quic_discovered_priority(&discovered_address);
1594
1595 let session = self.active_sessions.get_mut(&peer_id).ok_or_else(|| {
1597 DiscoveryError::InternalError(format!(
1598 "No active discovery session for peer {peer_id:?}"
1599 ))
1600 })?;
1601
1602 let already_exists = session
1604 .discovered_candidates
1605 .iter()
1606 .any(|c| c.address == discovered_address);
1607
1608 if already_exists {
1609 debug!(
1610 "QUIC-discovered address {} already in candidates",
1611 discovered_address
1612 );
1613 return Ok(false);
1614 }
1615
1616 info!("Accepting QUIC-discovered address: {}", discovered_address);
1617
1618 let candidate = DiscoveryCandidate {
1620 address: discovered_address,
1621 priority,
1622 source: DiscoverySourceType::ServerReflexive,
1623 state: CandidateState::New,
1624 };
1625
1626 session.discovered_candidates.push(candidate);
1628 session.statistics.server_reflexive_candidates_found += 1;
1629
1630 Ok(true)
1631 }
1632
1633 fn calculate_quic_discovered_priority(&self, address: &SocketAddr) -> u32 {
1635 let mut priority = 255; match address.ip() {
1640 IpAddr::V4(ipv4) => {
1641 if ipv4.is_private() {
1642 priority -= 10; } else if ipv4.is_loopback() {
1644 priority -= 20; }
1646 }
1648 IpAddr::V6(ipv6) => {
1649 priority += 10; if ipv6.is_loopback() {
1653 priority -= 30; } else if ipv6.is_multicast() {
1655 priority -= 40; } else if ipv6.is_unspecified() {
1657 priority -= 50; } else {
1659 let segments = ipv6.segments();
1661 if segments[0] & 0xFFC0 == 0xFE80 {
1662 priority -= 30; } else if segments[0] & 0xFE00 == 0xFC00 {
1665 priority -= 10; }
1668 }
1670 }
1671 }
1672
1673 priority
1674 }
1675
1676 #[allow(clippy::panic)]
1678 pub fn poll_discovery_progress(&mut self, peer_id: PeerId) -> Vec<DiscoveryEvent> {
1679 let mut events = Vec::new();
1680
1681 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
1682 for candidate in &session.discovered_candidates {
1684 if matches!(candidate.state, CandidateState::New) {
1685 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1686 candidate: candidate.to_candidate_address(),
1687 bootstrap_node: "0.0.0.0:0".parse().unwrap_or_else(|_| {
1688 panic!("hardcoded placeholder address should be valid")
1689 }), });
1691 }
1692 }
1693
1694 for candidate in &mut session.discovered_candidates {
1696 if matches!(candidate.state, CandidateState::New) {
1697 candidate.state = CandidateState::Validating;
1698 }
1699 }
1700 }
1701
1702 events
1703 }
1704
1705 pub fn get_discovery_status(&self, peer_id: PeerId) -> Option<DiscoveryStatus> {
1707 self.active_sessions.get(&peer_id).map(|session| {
1708 let discovered_candidates = session
1709 .discovered_candidates
1710 .iter()
1711 .map(|c| c.to_candidate_address())
1712 .collect();
1713
1714 DiscoveryStatus {
1715 phase: session.current_phase.clone(),
1716 discovered_candidates,
1717 statistics: session.statistics.clone(),
1718 elapsed_time: session.started_at.elapsed(),
1719 }
1720 })
1721 }
1722}
1723
1724#[derive(Debug, Clone)]
1726pub struct DiscoveryStatus {
1727 pub phase: DiscoveryPhase,
1728 pub discovered_candidates: Vec<CandidateAddress>,
1729 pub statistics: DiscoveryStatistics,
1730 pub elapsed_time: Duration,
1731}
1732
1733#[derive(Debug, Clone)]
1735pub struct DiscoveryResults {
1736 pub candidates: Vec<ValidatedCandidate>,
1737 pub completion_time: Instant,
1738 pub statistics: DiscoveryStatistics,
1739}
1740
1741pub trait NetworkInterfaceDiscovery {
1745 fn start_scan(&mut self) -> Result<(), String>;
1746 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1747}
1748
1749#[derive(Debug, Clone, PartialEq)]
1751pub struct NetworkInterface {
1752 pub name: String,
1753 pub addresses: Vec<SocketAddr>,
1754 pub is_up: bool,
1755 pub is_wireless: bool,
1756 pub mtu: Option<u16>,
1757}
1758
1759#[derive(Debug)]
1761#[allow(dead_code)]
1762struct BootstrapConnection {
1763 connection: crate::Connection,
1765 address: SocketAddr,
1767 established_at: Instant,
1769 request_id: u64,
1771}
1772
1773#[derive(Debug, Clone)]
1775#[allow(dead_code)]
1776struct AddressObservationRequest {
1777 request_id: u64,
1779 timestamp: u64,
1781 capabilities: u32,
1783}
1784
1785#[cfg(any())]
1791#[derive(Debug)]
1792#[allow(dead_code)]
1793pub(crate) struct ServerReflexiveDiscovery {
1794 config: DiscoveryConfig,
1795 active_queries: HashMap<BootstrapNodeId, QueryState>,
1797 responses: VecDeque<ServerReflexiveResponse>,
1799 query_timeouts: HashMap<BootstrapNodeId, Instant>,
1801 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
1803 runtime_handle: Option<tokio::runtime::Handle>,
1805}
1806
1807#[cfg(any())]
1808#[allow(dead_code)]
1809impl ServerReflexiveDiscovery {
1810 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1811 Self {
1812 config: config.clone(),
1813 active_queries: HashMap::new(),
1814 responses: VecDeque::new(),
1815 query_timeouts: HashMap::new(),
1816 active_connections: HashMap::new(),
1817 runtime_handle: tokio::runtime::Handle::try_current().ok(),
1818 }
1819 }
1820
1821 #[allow(dead_code)]
1822 pub(crate) fn start_queries(
1823 &mut self,
1824 bootstrap_nodes: &[BootstrapNodeId],
1825 now: Instant,
1826 ) -> HashMap<BootstrapNodeId, QueryState> {
1827 debug!(
1828 "Starting server reflexive queries to {} bootstrap nodes",
1829 bootstrap_nodes.len()
1830 );
1831
1832 self.active_queries.clear();
1833 self.query_timeouts.clear();
1834
1835 self.active_connections.clear();
1836
1837 for &node_id in bootstrap_nodes {
1838 let query_state = QueryState::Pending {
1839 sent_at: now,
1840 attempts: 1,
1841 };
1842
1843 self.active_queries.insert(node_id, query_state);
1844 self.query_timeouts
1845 .insert(node_id, now + self.config.bootstrap_query_timeout);
1846
1847 debug!(
1848 "Starting server reflexive query to bootstrap node {:?}",
1849 node_id
1850 );
1851
1852 if let Some(runtime) = &self.runtime_handle {
1854 self.start_bootstrap_query(node_id, runtime.clone(), now);
1855 } else {
1856 warn!(
1857 "No async runtime available, falling back to simulation for node {:?}",
1858 node_id
1859 );
1860 self.simulate_bootstrap_response(node_id, now);
1861 }
1862 }
1863
1864 self.active_queries.clone()
1865 }
1866
1867 #[allow(dead_code)]
1871 fn start_bootstrap_query(
1872 &mut self,
1873 node_id: BootstrapNodeId,
1874 _runtime: tokio::runtime::Handle,
1875 now: Instant,
1876 ) {
1877 let request_id = rand::random::<u64>();
1883
1884 debug!(
1885 "Starting QUIC connection to bootstrap node {:?} with request ID {}",
1886 node_id, request_id
1887 );
1888
1889 self.simulate_bootstrap_response(node_id, now);
1899 }
1900
1901 #[allow(dead_code)]
1908 async fn perform_bootstrap_query(
1909 _bootstrap_address: SocketAddr,
1910 _request_id: u64,
1911 _timeout: Duration,
1912 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1913 Err("Bootstrap query not implemented for low-level API".into())
1917
1918 }
1978
1979 #[allow(dead_code)]
1981 fn create_discovery_request(request_id: u64) -> Vec<u8> {
1982 let mut request = Vec::new();
1983
1984 request.extend_from_slice(&request_id.to_be_bytes());
1989 request.extend_from_slice(
1990 &std::time::SystemTime::now()
1991 .duration_since(std::time::UNIX_EPOCH)
1992 .unwrap_or_default()
1993 .as_millis()
1994 .to_be_bytes()[8..16],
1995 ); request.extend_from_slice(&1u32.to_be_bytes()); debug!(
1999 "Created discovery request: {} bytes, request_id: {}",
2000 request.len(),
2001 request_id
2002 );
2003 request
2004 }
2005
2006 #[allow(dead_code)]
2011 async fn wait_for_add_address_frame(
2012 _connection: &Connection,
2013 _expected_request_id: u64,
2014 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2015 Err("Legacy function - use OBSERVED_ADDRESS frame mechanism instead".into())
2018
2019 }
2055
2056 #[allow(dead_code)]
2058 fn create_response_channel(
2059 &self,
2060 ) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
2061 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2064 tx
2066 }
2067
2068 #[allow(dead_code)]
2069 pub(crate) fn poll_queries(
2070 &mut self,
2071 _active_queries: &HashMap<BootstrapNodeId, QueryState>,
2072 now: Instant,
2073 ) -> Vec<ServerReflexiveResponse> {
2074 let mut responses = Vec::new();
2075
2076 while let Some(response) = self.responses.pop_front() {
2078 responses.push(response);
2079 }
2080
2081 let mut timed_out_nodes = Vec::new();
2083 for (&node_id, &timeout) in &self.query_timeouts {
2084 if now >= timeout {
2085 timed_out_nodes.push(node_id);
2086 }
2087 }
2088
2089 for node_id in timed_out_nodes {
2091 self.query_timeouts.remove(&node_id);
2092
2093 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2094 match query_state {
2095 QueryState::Pending { attempts, .. }
2096 if *attempts < self.config.max_query_retries =>
2097 {
2098 *attempts += 1;
2100 let new_timeout = now + self.config.bootstrap_query_timeout;
2101 self.query_timeouts.insert(node_id, new_timeout);
2102
2103 debug!(
2104 "Retrying server reflexive query to bootstrap node {:?} (attempt {})",
2105 node_id, attempts
2106 );
2107
2108 self.simulate_bootstrap_response(node_id, now);
2110 }
2111 _ => {
2112 self.active_queries.insert(node_id, QueryState::Failed);
2114 warn!(
2115 "Server reflexive query to bootstrap node {:?} failed after retries",
2116 node_id
2117 );
2118 }
2119 }
2120 }
2121 }
2122
2123 responses
2124 }
2125
2126 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
2129 let simulated_external_addr = match node_id.0 % 3 {
2131 0 => "203.0.113.1:45678"
2132 .parse()
2133 .expect("Failed to parse hardcoded test address"),
2134 1 => "198.51.100.2:45679"
2135 .parse()
2136 .expect("Failed to parse hardcoded test address"),
2137 _ => "192.0.2.3:45680"
2138 .parse()
2139 .expect("Failed to parse hardcoded test address"),
2140 };
2141
2142 let response = ServerReflexiveResponse {
2143 bootstrap_node: node_id,
2144 observed_address: simulated_external_addr,
2145 response_time: Duration::from_millis(50 + node_id.0 * 10),
2146 timestamp: now,
2147 };
2148
2149 self.responses.push_back(response);
2150
2151 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2153 *query_state = QueryState::Completed;
2154 }
2155
2156 debug!(
2157 "Received simulated server reflexive response from bootstrap node {:?}: {}",
2158 node_id, simulated_external_addr
2159 );
2160 }
2161}
2162
2163#[cfg(any())]
2169#[derive(Debug)]
2170pub(crate) struct SymmetricNatPredictor {
2171 #[allow(dead_code)]
2172 config: DiscoveryConfig,
2173}
2174
2175#[cfg(any())]
2176impl SymmetricNatPredictor {
2177 #[allow(dead_code)]
2178 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2179 Self {
2180 config: config.clone(),
2181 }
2182 }
2183
2184 #[allow(dead_code)]
2189 pub(crate) fn generate_predictions(&mut self, max_count: usize) -> Vec<DiscoveryCandidate> {
2190 if !self.enable_symmetric_prediction {
2191 return Vec::new();
2192 }
2193
2194 let recent_events = self.get_recent_allocation_events();
2196
2197 if recent_events.is_empty() {
2198 return self.generate_heuristic_predictions(&[], max_count);
2200 }
2201
2202 if let Some(pattern) = self.analyze_allocation_pattern(&recent_events) {
2204 self.generate_pattern_based_predictions(&pattern, max_count)
2205 } else {
2206 self.generate_heuristic_predictions(
2208 &recent_events.iter().collect::<Vec<_>>(),
2209 max_count,
2210 )
2211 }
2212 }
2213
2214 #[allow(dead_code)]
2216 fn generate_pattern_based_predictions(
2217 &self,
2218 pattern: &PortAllocationPattern,
2219 max_count: usize,
2220 ) -> Vec<DiscoveryCandidate> {
2221 let mut predictions = Vec::new();
2222
2223 match pattern.pattern_type {
2224 AllocationPatternType::Sequential => {
2225 for i in 1..=max_count as u16 {
2227 let predicted_port = pattern.base_port.wrapping_add(i);
2228 if self.is_valid_port(predicted_port) {
2229 predictions.push(
2230 self.create_predicted_candidate(predicted_port, pattern.confidence),
2231 );
2232 }
2233 }
2234 }
2235 AllocationPatternType::FixedStride => {
2236 for i in 1..=max_count as u16 {
2238 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
2239 if self.is_valid_port(predicted_port) {
2240 predictions.push(
2241 self.create_predicted_candidate(predicted_port, pattern.confidence),
2242 );
2243 }
2244 }
2245 }
2246 AllocationPatternType::PoolBased => {
2247 if let Some((min_port, max_port)) = pattern.pool_boundaries {
2249 let pool_size = max_port - min_port + 1;
2250 let step = (pool_size / max_count as u16).max(1);
2251
2252 for i in 0..max_count as u16 {
2253 let predicted_port = min_port + (i * step);
2254 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
2255 predictions.push(self.create_predicted_candidate(
2256 predicted_port,
2257 pattern.confidence * 0.8,
2258 ));
2259 }
2260 }
2261 }
2262 }
2263 AllocationPatternType::TimeBased => {
2264 for i in 1..=max_count as u16 {
2267 let predicted_port = pattern.base_port.wrapping_add(i);
2268 if self.is_valid_port(predicted_port) {
2269 predictions.push(
2270 self.create_predicted_candidate(
2271 predicted_port,
2272 pattern.confidence * 0.6,
2273 ),
2274 );
2275 }
2276 }
2277 }
2278 AllocationPatternType::Random | AllocationPatternType::Unknown => {
2279 predictions
2281 .extend(self.generate_statistical_predictions(pattern.base_port, max_count));
2282 }
2283 }
2284
2285 predictions
2286 }
2287
2288 #[allow(dead_code)]
2290 fn generate_heuristic_predictions(
2291 &self,
2292 recent_events: &[&PortAllocationEvent],
2293 max_count: usize,
2294 ) -> Vec<DiscoveryCandidate> {
2295 let mut predictions = Vec::new();
2296
2297 if let Some(latest_event) = recent_events.first() {
2298 let base_port = latest_event.port;
2299
2300 for i in 1..=(max_count / 3) as u16 {
2304 let predicted_port = base_port.wrapping_add(i);
2305 if self.is_valid_port(predicted_port) {
2306 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
2307 }
2308 }
2309
2310 if base_port % 2 == 0 {
2312 let predicted_port = base_port + 1;
2313 if self.is_valid_port(predicted_port) {
2314 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
2315 }
2316 }
2317
2318 for stride in [2, 4, 8, 16] {
2320 if predictions.len() >= max_count {
2321 break;
2322 }
2323 let predicted_port = base_port.wrapping_add(stride);
2324 if self.is_valid_port(predicted_port) {
2325 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
2326 }
2327 }
2328
2329 if recent_events.len() >= 2 {
2331 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
2332 if stride > 0 && stride <= 100 {
2333 for i in 1..=3 {
2335 if predictions.len() >= max_count {
2336 break;
2337 }
2338 let predicted_port = base_port.wrapping_add(stride * i);
2339 if self.is_valid_port(predicted_port) {
2340 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
2341 }
2342 }
2343 }
2344 }
2345 }
2346
2347 predictions.truncate(max_count);
2348 predictions
2349 }
2350
2351 #[allow(dead_code)]
2353 fn generate_statistical_predictions(
2354 &self,
2355 base_port: u16,
2356 max_count: usize,
2357 ) -> Vec<DiscoveryCandidate> {
2358 let mut predictions = Vec::new();
2359
2360 let common_ranges = [
2362 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
2367
2368 let current_range = common_ranges
2370 .iter()
2371 .find(|(min, max)| base_port >= *min && base_port <= *max)
2372 .copied()
2373 .unwrap_or((1024, 65535));
2374
2375 let range_size = current_range.1 - current_range.0;
2377 let step = (range_size / max_count as u16).max(1);
2378
2379 for i in 0..max_count {
2380 let offset = (i as u16 * step) % range_size;
2381 let predicted_port = current_range.0 + offset;
2382
2383 if self.is_valid_port(predicted_port) && predicted_port != base_port {
2384 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
2385 }
2386 }
2387
2388 predictions
2389 }
2390
2391 #[allow(dead_code)]
2393 fn is_valid_port(&self, port: u16) -> bool {
2394 const COMMON_PORTS_TO_AVOID: &[u16] = &[
2398 21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3389, 5432, 3306, 6379, 27017, ];
2414
2415 port != 0 && port >= 1024 && !COMMON_PORTS_TO_AVOID.contains(&port)
2416 }
2417
2418 #[allow(dead_code)]
2420 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2421 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
2425
2426 DiscoveryCandidate {
2427 address: SocketAddr::new(
2428 "0.0.0.0"
2429 .parse()
2430 .expect("Failed to parse hardcoded placeholder IP"),
2431 port,
2432 ),
2433 priority,
2434 source: DiscoverySourceType::Predicted,
2435 state: CandidateState::New,
2436 }
2437 }
2438
2439 #[allow(dead_code)]
2441 pub(crate) fn analyze_allocation_patterns(
2442 &self,
2443 history: &VecDeque<PortAllocationEvent>,
2444 ) -> Option<PortAllocationPattern> {
2445 if history.len() < 3 {
2446 return None;
2447 }
2448
2449 let recent_ports: Vec<u16> = history
2450 .iter()
2451 .rev()
2452 .take(10)
2453 .map(|event| event.port)
2454 .collect();
2455
2456 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2458 return Some(pattern);
2459 }
2460
2461 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2463 return Some(pattern);
2464 }
2465
2466 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2468 return Some(pattern);
2469 }
2470
2471 if let Some(pattern) = self.detect_time_based_pattern(history) {
2473 return Some(pattern);
2474 }
2475
2476 None
2477 }
2478
2479 #[allow(dead_code)]
2481 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2482 if ports.len() < 3 {
2483 return None;
2484 }
2485
2486 let mut sequential_count = 0;
2487 let mut total_comparisons = 0;
2488
2489 for i in 1..ports.len() {
2490 total_comparisons += 1;
2491 let diff = ports[i - 1].wrapping_sub(ports[i]);
2492 if diff == 1 {
2493 sequential_count += 1;
2494 }
2495 }
2496
2497 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2498
2499 if sequential_ratio >= 0.6 {
2500 let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2504 pattern_type: AllocationPatternType::Sequential,
2505 base_port: ports[0],
2506 stride: 1,
2507 pool_boundaries: None,
2508 confidence,
2509 })
2510 } else {
2511 None
2512 }
2513 }
2514
2515 #[allow(dead_code)]
2517 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2518 if ports.len() < 4 {
2519 return None;
2520 }
2521
2522 let mut diffs = Vec::new();
2524 for i in 1..ports.len() {
2525 let diff = ports[i - 1].wrapping_sub(ports[i]);
2526 if diff > 0 && diff <= 1000 {
2527 diffs.push(diff);
2529 }
2530 }
2531
2532 if diffs.len() < 2 {
2533 return None;
2534 }
2535
2536 let mut diff_counts = std::collections::HashMap::new();
2538 for &diff in &diffs {
2539 *diff_counts.entry(diff).or_insert(0) += 1;
2540 }
2541
2542 let (most_common_diff, count) = diff_counts
2543 .iter()
2544 .max_by_key(|&(_, &count)| count)
2545 .map(|(&diff, &count)| (diff, count))?;
2546
2547 let consistency_ratio = count as f64 / diffs.len() as f64;
2548
2549 if consistency_ratio >= 0.5 && most_common_diff > 1 {
2550 let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2554 pattern_type: AllocationPatternType::FixedStride,
2555 base_port: ports[0],
2556 stride: most_common_diff,
2557 pool_boundaries: None,
2558 confidence,
2559 })
2560 } else {
2561 None
2562 }
2563 }
2564
2565 #[allow(dead_code)]
2567 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2568 if ports.len() < 5 {
2569 return None;
2570 }
2571
2572 let min_port = *ports.iter().min()?;
2573 let max_port = *ports.iter().max()?;
2574 let range = max_port - min_port;
2575
2576 if range > 0 && range <= 10000 {
2578 let expected_step = range / (ports.len() as u16 - 1);
2581 let mut uniform_score = 0.0;
2582
2583 let mut sorted_ports = ports.to_vec();
2584 sorted_ports.sort_unstable();
2585
2586 for i in 1..sorted_ports.len() {
2587 let actual_step = sorted_ports[i] - sorted_ports[i - 1];
2588 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2589 let normalized_diff = step_diff / expected_step as f64;
2590 uniform_score += 1.0 - normalized_diff.min(1.0);
2591 }
2592
2593 uniform_score /= (sorted_ports.len() - 1) as f64;
2594
2595 if uniform_score >= 0.4 {
2596 let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2600 pattern_type: AllocationPatternType::PoolBased,
2601 base_port: min_port,
2602 stride: expected_step,
2603 pool_boundaries: Some((min_port, max_port)),
2604 confidence,
2605 })
2606 } else {
2607 None
2608 }
2609 } else {
2610 None
2611 }
2612 }
2613
2614 #[allow(dead_code)]
2616 fn detect_time_based_pattern(
2617 &self,
2618 history: &VecDeque<PortAllocationEvent>,
2619 ) -> Option<PortAllocationPattern> {
2620 if history.len() < 4 {
2621 return None;
2622 }
2623
2624 let mut time_intervals = Vec::new();
2626 let events: Vec<_> = history.iter().collect();
2627
2628 for i in 1..events.len() {
2629 let interval = events[i - 1].timestamp.duration_since(events[i].timestamp);
2630 time_intervals.push(interval);
2631 }
2632
2633 if time_intervals.is_empty() {
2634 return None;
2635 }
2636
2637 let avg_interval =
2639 time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2640
2641 let mut consistency_score = 0.0;
2642 for interval in &time_intervals {
2643 let diff = (*interval).abs_diff(avg_interval);
2644
2645 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2646 consistency_score += 1.0 - normalized_diff.min(1.0);
2647 }
2648
2649 consistency_score /= time_intervals.len() as f64;
2650
2651 if consistency_score >= 0.6
2652 && avg_interval.as_millis() > 100
2653 && avg_interval.as_millis() < 10000
2654 {
2655 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2658 pattern_type: AllocationPatternType::TimeBased,
2659 base_port: events[0].port,
2660 stride: 1, pool_boundaries: None,
2662 confidence,
2663 })
2664 } else {
2665 None
2666 }
2667 }
2668
2669 #[allow(dead_code)]
2671 pub(crate) fn generate_confidence_scored_predictions(
2672 &mut self,
2673 base_address: SocketAddr,
2674 _max_count: usize,
2675 ) -> Vec<(DiscoveryCandidate, f64)> {
2676 let scored_predictions = Vec::new();
2677
2678 let _ = base_address;
2680
2681 scored_predictions
2682 }
2683
2684 #[allow(dead_code)]
2686 fn calculate_prediction_confidence(&self) -> f64 {
2687 0.0
2688 }
2689
2690 #[allow(dead_code)]
2692 pub(crate) fn update_pattern_analysis(&self, _new_event: PortAllocationEvent) {}
2693}
2694
2695#[cfg(any())]
2697#[allow(dead_code)]
2698#[derive(Debug)]
2699pub(crate) struct BootstrapNodeManager {
2700 config: DiscoveryConfig,
2701 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
2702 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
2703 performance_tracker: BootstrapPerformanceTracker,
2704 last_health_check: Option<Instant>,
2705 health_check_interval: Duration,
2706 failover_threshold: f64,
2707 discovery_sources: Vec<BootstrapDiscoverySource>,
2708}
2709
2710#[cfg(any())]
2711#[allow(dead_code)]
2712#[derive(Debug, Clone)]
2713pub(crate) struct BootstrapNodeInfo {
2714 pub address: SocketAddr,
2715 pub last_seen: Instant,
2716 pub can_coordinate: bool,
2717 pub health_status: BootstrapHealthStatus,
2718 pub capabilities: BootstrapCapabilities,
2719 pub priority: u32,
2720 pub discovery_source: BootstrapDiscoverySource,
2721}
2722
2723#[cfg(any())]
2724#[allow(dead_code)]
2725#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2726pub(crate) enum BootstrapHealthStatus {
2727 Healthy,
2728 Degraded,
2729 Unhealthy,
2730 Unknown,
2731}
2732
2733#[cfg(any())]
2734#[allow(dead_code)]
2735#[derive(Debug, Clone, Default)]
2736pub(crate) struct BootstrapCapabilities {
2737 pub supports_nat_traversal: bool,
2738 pub supports_ipv6: bool,
2739 pub supports_quic_extensions: bool,
2740 pub max_concurrent_coordinations: u32,
2741 pub supported_quic_versions: Vec<u32>,
2742}
2743
2744#[cfg(any())]
2745#[allow(dead_code)]
2746#[derive(Debug, Clone, Default)]
2747pub(crate) struct BootstrapHealthStats {
2748 pub connection_attempts: u32,
2749 pub successful_connections: u32,
2750 pub failed_connections: u32,
2751 pub average_rtt: Option<Duration>,
2752 pub recent_rtts: VecDeque<Duration>,
2753 pub last_health_check: Option<Instant>,
2754 pub consecutive_failures: u32,
2755 pub coordination_requests: u32,
2756 pub successful_coordinations: u32,
2757}
2758
2759#[cfg(any())]
2760#[allow(dead_code)]
2761#[derive(Debug, Default)]
2762pub(crate) struct BootstrapPerformanceTracker {
2763 pub overall_success_rate: f64,
2764 pub average_response_time: Duration,
2765 pub best_performers: Vec<BootstrapNodeId>,
2766 pub performance_history: VecDeque<PerformanceSnapshot>,
2767}
2768
2769#[cfg(any())]
2770#[allow(dead_code)]
2771#[derive(Debug, Clone)]
2772pub(crate) struct PerformanceSnapshot {
2773 pub timestamp: Instant,
2774 pub active_nodes: u32,
2775 pub success_rate: f64,
2776 pub average_rtt: Duration,
2777}
2778
2779#[cfg(any())]
2780#[allow(dead_code)]
2781#[derive(Debug, Clone, PartialEq, Eq)]
2782pub(crate) enum BootstrapDiscoverySource {
2783 Static,
2784 DNS,
2785 UserProvided,
2786}
2787
2788#[cfg(any())]
2789#[allow(dead_code)]
2790impl BootstrapNodeManager {
2791 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2792 Self {
2793 config: config.clone(),
2794 bootstrap_nodes: HashMap::new(),
2795 health_stats: HashMap::new(),
2796 performance_tracker: BootstrapPerformanceTracker::default(),
2797 last_health_check: None,
2798 health_check_interval: Duration::from_secs(30),
2799 failover_threshold: 0.3, discovery_sources: vec![
2801 BootstrapDiscoverySource::Static,
2802 BootstrapDiscoverySource::DNS,
2803 BootstrapDiscoverySource::UserProvided,
2804 ],
2805 }
2806 }
2807
2808 #[allow(dead_code)]
2810 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
2811 let now = Instant::now();
2812
2813 for (i, node) in nodes.into_iter().enumerate() {
2815 let node_id = BootstrapNodeId(i as u64);
2816
2817 let node_info = BootstrapNodeInfo {
2818 address: node.address,
2819 last_seen: node.last_seen,
2820 can_coordinate: node.can_coordinate,
2821 health_status: BootstrapHealthStatus::Unknown,
2822 capabilities: BootstrapCapabilities {
2823 supports_nat_traversal: node.can_coordinate,
2824 supports_ipv6: node.address.is_ipv6(),
2825 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
2829 priority: self.calculate_initial_priority(&node),
2830 discovery_source: BootstrapDiscoverySource::UserProvided,
2831 };
2832
2833 self.bootstrap_nodes.insert(node_id, node_info);
2834
2835 self.health_stats.entry(node_id).or_default();
2837 }
2838
2839 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
2840 self.schedule_health_check(now);
2841 }
2842
2843 #[allow(dead_code)]
2845 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
2846 let mut active_nodes: Vec<_> = self
2847 .bootstrap_nodes
2848 .iter()
2849 .filter(|(_, node)| {
2850 matches!(
2851 node.health_status,
2852 BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown
2853 )
2854 })
2855 .map(|(&id, node)| (id, node))
2856 .collect();
2857
2858 active_nodes.sort_by(|a, b| {
2860 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
2862 if health_cmp != std::cmp::Ordering::Equal {
2863 return health_cmp;
2864 }
2865
2866 b.1.priority.cmp(&a.1.priority)
2868 });
2869
2870 active_nodes.into_iter().map(|(id, _)| id).collect()
2871 }
2872
2873 #[allow(dead_code)]
2875 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
2876 self.bootstrap_nodes.get(&id).map(|node| node.address)
2877 }
2878
2879 #[allow(dead_code)]
2881 pub(crate) fn perform_health_check(&mut self, now: Instant) {
2882 if let Some(last_check) = self.last_health_check {
2883 if now.duration_since(last_check) < self.health_check_interval {
2884 return; }
2886 }
2887
2888 debug!(
2889 "Performing health check on {} bootstrap nodes",
2890 self.bootstrap_nodes.len()
2891 );
2892
2893 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
2895
2896 for node_id in node_ids {
2897 self.check_node_health(node_id, now);
2898 }
2899
2900 self.update_performance_metrics(now);
2901 self.last_health_check = Some(now);
2902 }
2903
2904 #[allow(dead_code)]
2906 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
2907 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
2909 let node_info_for_priority = match node_info_opt {
2910 Some(node_info) => node_info,
2911 None => return, };
2913 let current_health_status = node_info_for_priority.health_status;
2914
2915 let (_success_rate, new_health_status, _average_rtt) = {
2917 let stats = self.health_stats.get_mut(&node_id).unwrap();
2918
2919 let success_rate = if stats.connection_attempts > 0 {
2921 stats.successful_connections as f64 / stats.connection_attempts as f64
2922 } else {
2923 1.0 };
2925
2926 if !stats.recent_rtts.is_empty() {
2928 let total_rtt: Duration = stats.recent_rtts.iter().sum();
2929 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
2930 }
2931
2932 let new_health_status = if stats.consecutive_failures >= 3 {
2934 BootstrapHealthStatus::Unhealthy
2935 } else if success_rate < self.failover_threshold {
2936 BootstrapHealthStatus::Degraded
2937 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
2938 BootstrapHealthStatus::Healthy
2939 } else {
2940 current_health_status };
2942
2943 stats.last_health_check = Some(now);
2944
2945 (success_rate, new_health_status, stats.average_rtt)
2946 };
2947
2948 let stats_snapshot = match self.health_stats.get(&node_id) {
2950 Some(stats) => stats,
2951 None => {
2952 warn!("No health stats found for bootstrap node {:?}", node_id);
2953 return;
2954 }
2955 };
2956 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
2957
2958 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2960 if new_health_status != node_info.health_status {
2961 info!(
2962 "Bootstrap node {:?} health status changed: {:?} -> {:?}",
2963 node_id, node_info.health_status, new_health_status
2964 );
2965 node_info.health_status = new_health_status;
2966 }
2967
2968 node_info.priority = new_priority;
2969 }
2970 }
2971
2972 #[allow(dead_code)]
2974 pub(crate) fn record_connection_attempt(
2975 &mut self,
2976 node_id: BootstrapNodeId,
2977 success: bool,
2978 rtt: Option<Duration>,
2979 ) {
2980 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2981 stats.connection_attempts += 1;
2982
2983 if success {
2984 stats.successful_connections += 1;
2985 stats.consecutive_failures = 0;
2986
2987 if let Some(rtt) = rtt {
2988 stats.recent_rtts.push_back(rtt);
2989 if stats.recent_rtts.len() > 10 {
2990 stats.recent_rtts.pop_front();
2991 }
2992 }
2993 } else {
2994 stats.failed_connections += 1;
2995 stats.consecutive_failures += 1;
2996 }
2997 }
2998
2999 if success {
3001 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3002 node_info.last_seen = Instant::now();
3003 }
3004 }
3005 }
3006
3007 #[allow(dead_code)]
3009 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
3010 if let Some(stats) = self.health_stats.get_mut(&node_id) {
3011 stats.coordination_requests += 1;
3012 if success {
3013 stats.successful_coordinations += 1;
3014 }
3015 }
3016 }
3017
3018 #[allow(dead_code)]
3020 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
3021 let mut nodes_with_scores: Vec<_> = self
3022 .bootstrap_nodes
3023 .iter()
3024 .filter_map(|(&id, node)| {
3025 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
3026 let score = self.calculate_performance_score(id, node);
3027 Some((id, score))
3028 } else {
3029 None
3030 }
3031 })
3032 .collect();
3033
3034 nodes_with_scores
3035 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3036
3037 nodes_with_scores
3038 .into_iter()
3039 .take(count)
3040 .map(|(id, _)| id)
3041 .collect()
3042 }
3043
3044 #[allow(dead_code)]
3046 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
3047 let mut discovered_nodes = Vec::new();
3048
3049 if let Ok(dns_nodes) = self.discover_via_dns() {
3051 discovered_nodes.extend(dns_nodes);
3052 }
3053
3054 if let Ok(multicast_nodes) = self.discover_via_multicast() {
3056 discovered_nodes.extend(multicast_nodes);
3057 }
3058
3059 for node in &discovered_nodes {
3061 let node_id = BootstrapNodeId(rand::random());
3062 self.bootstrap_nodes.insert(node_id, node.clone());
3063 self.health_stats
3064 .insert(node_id, BootstrapHealthStats::default());
3065 }
3066
3067 if !discovered_nodes.is_empty() {
3068 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
3069 }
3070
3071 Ok(discovered_nodes)
3072 }
3073
3074 #[allow(dead_code)]
3076 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3077 debug!("DNS-based bootstrap discovery not yet implemented");
3080 Ok(Vec::new())
3081 }
3082
3083 #[allow(dead_code)]
3085 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3086 debug!("Multicast-based bootstrap discovery not yet implemented");
3089 Ok(Vec::new())
3090 }
3091
3092 #[allow(dead_code)]
3094 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
3095 let mut priority = 100; if node.can_coordinate {
3098 priority += 50;
3099 }
3100
3101 if let Some(rtt) = node.rtt {
3102 if rtt < Duration::from_millis(50) {
3103 priority += 30;
3104 } else if rtt < Duration::from_millis(100) {
3105 priority += 20;
3106 } else if rtt < Duration::from_millis(200) {
3107 priority += 10;
3108 }
3109 }
3110
3111 if node.address.is_ipv6() {
3113 priority += 10;
3114 }
3115
3116 priority
3117 }
3118
3119 #[allow(dead_code)]
3121 fn calculate_dynamic_priority(
3122 &self,
3123 node_info: &BootstrapNodeInfo,
3124 stats: &BootstrapHealthStats,
3125 ) -> u32 {
3126 let mut priority = node_info.priority;
3127
3128 let success_rate = if stats.connection_attempts > 0 {
3130 stats.successful_connections as f64 / stats.connection_attempts as f64
3131 } else {
3132 1.0
3133 };
3134
3135 priority = (priority as f64 * success_rate) as u32;
3136
3137 if let Some(avg_rtt) = stats.average_rtt {
3139 if avg_rtt < Duration::from_millis(50) {
3140 priority += 20;
3141 } else if avg_rtt > Duration::from_millis(500) {
3142 priority = priority.saturating_sub(20);
3143 }
3144 }
3145
3146 priority = priority.saturating_sub(stats.consecutive_failures * 10);
3148
3149 priority.max(1) }
3151
3152 #[allow(dead_code)]
3154 fn calculate_performance_score(
3155 &self,
3156 node_id: BootstrapNodeId,
3157 _node_info: &BootstrapNodeInfo,
3158 ) -> f64 {
3159 let stats = self.health_stats.get(&node_id).unwrap();
3160
3161 let mut score = 0.0;
3162
3163 let success_rate = if stats.connection_attempts > 0 {
3165 stats.successful_connections as f64 / stats.connection_attempts as f64
3166 } else {
3167 1.0
3168 };
3169 score += success_rate * 0.4;
3170
3171 if let Some(avg_rtt) = stats.average_rtt {
3173 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
3174 score += rtt_score * 0.3;
3175 } else {
3176 score += 0.3; }
3178
3179 let coord_success_rate = if stats.coordination_requests > 0 {
3181 stats.successful_coordinations as f64 / stats.coordination_requests as f64
3182 } else {
3183 1.0
3184 };
3185 score += coord_success_rate * 0.2;
3186
3187 let stability_score = if stats.consecutive_failures == 0 {
3189 1.0
3190 } else {
3191 1.0 / (stats.consecutive_failures as f64 + 1.0)
3192 };
3193 score += stability_score * 0.1;
3194
3195 score
3196 }
3197
3198 fn compare_health_status(
3200 &self,
3201 a: BootstrapHealthStatus,
3202 b: BootstrapHealthStatus,
3203 ) -> std::cmp::Ordering {
3204 use std::cmp::Ordering;
3205
3206 match (a, b) {
3207 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
3208 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
3210 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
3211 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
3213 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
3214 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
3216 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
3217 }
3218 }
3219
3220 fn update_performance_metrics(&mut self, now: Instant) {
3222 let mut total_attempts = 0;
3223 let mut total_successes = 0;
3224 let mut total_rtt = Duration::ZERO;
3225 let mut rtt_count = 0;
3226
3227 for stats in self.health_stats.values() {
3228 total_attempts += stats.connection_attempts;
3229 total_successes += stats.successful_connections;
3230
3231 if let Some(avg_rtt) = stats.average_rtt {
3232 total_rtt += avg_rtt;
3233 rtt_count += 1;
3234 }
3235 }
3236
3237 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
3238 total_successes as f64 / total_attempts as f64
3239 } else {
3240 1.0
3241 };
3242
3243 self.performance_tracker.average_response_time = if rtt_count > 0 {
3244 total_rtt / rtt_count
3245 } else {
3246 Duration::from_millis(100) };
3248
3249 self.performance_tracker.best_performers = self.get_best_performers(5);
3251
3252 let snapshot = PerformanceSnapshot {
3254 timestamp: now,
3255 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
3256 success_rate: self.performance_tracker.overall_success_rate,
3257 average_rtt: self.performance_tracker.average_response_time,
3258 };
3259
3260 self.performance_tracker
3261 .performance_history
3262 .push_back(snapshot);
3263 if self.performance_tracker.performance_history.len() > 100 {
3264 self.performance_tracker.performance_history.pop_front();
3265 }
3266 }
3267
3268 fn schedule_health_check(&mut self, _now: Instant) {
3270 }
3273
3274 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
3276 &self.performance_tracker
3277 }
3278
3279 pub(crate) fn get_node_health_stats(
3281 &self,
3282 node_id: BootstrapNodeId,
3283 ) -> Option<&BootstrapHealthStats> {
3284 self.health_stats.get(&node_id)
3285 }
3286}
3287
3288#[derive(Debug)]
3290pub(crate) struct DiscoveryCache {
3291 #[allow(dead_code)]
3292 config: DiscoveryConfig,
3293}
3294
3295impl DiscoveryCache {
3296 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3297 Self {
3298 config: config.clone(),
3299 }
3300 }
3301}
3302
3303pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
3305 #[cfg(all(target_os = "windows", feature = "network-discovery"))]
3306 return Box::new(WindowsInterfaceDiscovery::new());
3307
3308 #[cfg(all(target_os = "linux", feature = "network-discovery"))]
3309 return Box::new(LinuxInterfaceDiscovery::new());
3310
3311 #[cfg(all(target_os = "macos", feature = "network-discovery"))]
3312 return Box::new(MacOSInterfaceDiscovery::new());
3313
3314 #[cfg(any(
3318 all(target_os = "windows", not(feature = "network-discovery")),
3319 all(target_os = "linux", not(feature = "network-discovery")),
3320 all(target_os = "macos", not(feature = "network-discovery")),
3321 not(any(target_os = "windows", target_os = "linux", target_os = "macos"))
3322 ))]
3323 return Box::new(GenericInterfaceDiscovery::new());
3324}
3325
3326#[allow(dead_code)]
3336pub(crate) struct GenericInterfaceDiscovery {
3337 scan_complete: bool,
3338}
3339
3340impl GenericInterfaceDiscovery {
3341 #[allow(dead_code)]
3342 pub(crate) fn new() -> Self {
3343 Self {
3344 scan_complete: false,
3345 }
3346 }
3347}
3348
3349impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
3350 fn start_scan(&mut self) -> Result<(), String> {
3351 self.scan_complete = true;
3353 Ok(())
3354 }
3355
3356 #[allow(clippy::panic)]
3357 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
3358 if self.scan_complete {
3359 self.scan_complete = false;
3360 Some(vec![NetworkInterface {
3361 name: "generic".to_string(),
3362 addresses: vec![
3363 "127.0.0.1:0"
3364 .parse()
3365 .unwrap_or_else(|_| panic!("Failed to parse hardcoded localhost address")),
3366 ],
3367 is_up: true,
3368 is_wireless: false,
3369 mtu: Some(1500),
3370 }])
3371 } else {
3372 None
3373 }
3374 }
3375}
3376
3377impl std::fmt::Display for DiscoveryError {
3378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3379 match self {
3380 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
3381 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
3382 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
3383 Self::InsufficientCandidates { found, required } => {
3384 write!(f, "insufficient candidates found: {found} < {required}")
3385 }
3386 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3387 Self::ConfigurationError(msg) => write!(f, "configuration error: {msg}"),
3388 Self::InternalError(msg) => write!(f, "internal error: {msg}"),
3389 }
3390 }
3391}
3392
3393impl std::error::Error for DiscoveryError {}
3394
3395pub mod test_utils {
3397 use super::*;
3398
3399 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
3401 let mut priority = 100; match address {
3403 IpAddr::V4(ipv4) => {
3404 if ipv4.is_private() {
3405 priority += 50; }
3407 }
3408 IpAddr::V6(ipv6) => {
3409 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3412 let segments = ipv6.segments();
3413 if segments[0] & 0xE000 == 0x2000 {
3414 priority += 60;
3416 } else if segments[0] & 0xFFC0 == 0xFE80 {
3417 priority += 20;
3419 } else if segments[0] & 0xFE00 == 0xFC00 {
3420 priority += 40;
3422 } else {
3423 priority += 30;
3425 }
3426 }
3427
3428 priority += 10; }
3431 }
3432 priority
3433 }
3434
3435 pub fn is_valid_address(address: &IpAddr) -> bool {
3437 match address {
3438 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3439 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3440 }
3441 }
3442}
3443
3444#[cfg(test)]
3445mod tests {
3446 use super::*;
3447
3448 fn create_test_manager() -> CandidateDiscoveryManager {
3449 let config = DiscoveryConfig {
3450 total_timeout: Duration::from_secs(30),
3451 local_scan_timeout: Duration::from_secs(5),
3452 bootstrap_query_timeout: Duration::from_secs(10),
3453 max_query_retries: 3,
3454 max_candidates: 50,
3455 enable_symmetric_prediction: true,
3456 min_bootstrap_consensus: 2,
3457 interface_cache_ttl: Duration::from_secs(300),
3458 server_reflexive_cache_ttl: Duration::from_secs(600),
3459 bound_address: None,
3460 };
3461 CandidateDiscoveryManager::new(config)
3462 }
3463
3464 #[test]
3465 fn test_accept_quic_discovered_addresses() {
3466 let mut manager = create_test_manager();
3467 let peer_id = PeerId([1; 32]);
3468
3469 manager
3471 .start_discovery(peer_id, vec![])
3472 .expect("Failed to start discovery in test");
3473
3474 let discovered_addr = "192.168.1.100:5000"
3476 .parse()
3477 .expect("Failed to parse test address");
3478 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3479
3480 assert!(result.is_ok());
3481
3482 if let Some(session) = manager.active_sessions.get(&peer_id) {
3484 let found = session.discovered_candidates.iter().any(|c| {
3485 c.address == discovered_addr
3486 && matches!(c.source, DiscoverySourceType::ServerReflexive)
3487 });
3488 assert!(found, "QUIC-discovered address should be in candidates");
3489 }
3490 }
3491
3492 #[test]
3493 fn test_accept_quic_discovered_addresses_no_session() {
3494 let mut manager = create_test_manager();
3495 let peer_id = PeerId([1; 32]);
3496 let discovered_addr = "192.168.1.100:5000"
3497 .parse()
3498 .expect("Failed to parse test address");
3499
3500 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3502
3503 assert!(result.is_err());
3504 match result {
3505 Err(DiscoveryError::InternalError(msg)) => {
3506 assert!(msg.contains("No active discovery session"));
3507 }
3508 _ => panic!("Expected InternalError for missing session"),
3509 }
3510 }
3511
3512 #[test]
3513 fn test_accept_quic_discovered_addresses_deduplication() {
3514 let mut manager = create_test_manager();
3515 let peer_id = PeerId([1; 32]);
3516
3517 manager
3519 .start_discovery(peer_id, vec![])
3520 .expect("Failed to start discovery in test");
3521
3522 let discovered_addr = "192.168.1.100:5000"
3524 .parse()
3525 .expect("Failed to parse test address");
3526 let result1 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3527 let result2 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3528
3529 assert!(result1.is_ok());
3530 assert!(result2.is_ok()); if let Some(session) = manager.active_sessions.get(&peer_id) {
3534 let count = session
3535 .discovered_candidates
3536 .iter()
3537 .filter(|c| c.address == discovered_addr)
3538 .count();
3539 assert_eq!(count, 1, "Should not have duplicate addresses");
3540 }
3541 }
3542
3543 #[test]
3544 fn test_accept_quic_discovered_addresses_priority() {
3545 let mut manager = create_test_manager();
3546 let peer_id = PeerId([1; 32]);
3547
3548 manager
3550 .start_discovery(peer_id, vec![])
3551 .expect("Failed to start discovery in test");
3552
3553 let public_addr = "8.8.8.8:5000"
3555 .parse()
3556 .expect("Failed to parse test address");
3557 let private_addr = "192.168.1.100:5000"
3558 .parse()
3559 .expect("Failed to parse test address");
3560 let ipv6_addr = "[2001:db8::1]:5000"
3561 .parse()
3562 .expect("Failed to parse test address");
3563
3564 manager
3565 .accept_quic_discovered_address(peer_id, public_addr)
3566 .expect("Failed to accept public address in test");
3567 manager
3568 .accept_quic_discovered_address(peer_id, private_addr)
3569 .expect("Failed to accept private address in test");
3570 manager
3571 .accept_quic_discovered_address(peer_id, ipv6_addr)
3572 .unwrap();
3573
3574 if let Some(session) = manager.active_sessions.get(&peer_id) {
3576 for candidate in &session.discovered_candidates {
3577 assert!(
3578 candidate.priority > 0,
3579 "All candidates should have non-zero priority"
3580 );
3581
3582 if candidate.address == ipv6_addr {
3584 let ipv4_priority = session
3585 .discovered_candidates
3586 .iter()
3587 .find(|c| c.address == public_addr)
3588 .map(|c| c.priority)
3589 .expect("Public address should be found in candidates");
3590
3591 assert!(candidate.priority >= ipv4_priority);
3593 }
3594 }
3595 }
3596 }
3597
3598 #[test]
3599 fn test_accept_quic_discovered_addresses_event_generation() {
3600 let mut manager = create_test_manager();
3601 let peer_id = PeerId([1; 32]);
3602
3603 manager
3605 .start_discovery(peer_id, vec![])
3606 .expect("Failed to start discovery in test");
3607
3608 let discovered_addr = "192.168.1.100:5000"
3610 .parse()
3611 .expect("Failed to parse test address");
3612 manager
3613 .accept_quic_discovered_address(peer_id, discovered_addr)
3614 .expect("Failed to accept address in test");
3615
3616 let events = manager.poll_discovery_progress(peer_id);
3618
3619 let has_event = events.iter().any(|e| {
3621 matches!(e,
3622 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3623 if candidate.address == discovered_addr
3624 )
3625 });
3626
3627 assert!(
3628 has_event,
3629 "Should generate discovery event for QUIC-discovered address"
3630 );
3631 }
3632
3633 #[test]
3634 fn test_discovery_completes_without_server_reflexive_phase() {
3635 let mut manager = create_test_manager();
3636 let peer_id = PeerId([1; 32]);
3637
3638 manager
3640 .start_discovery(peer_id, vec![])
3641 .expect("Failed to start discovery in test");
3642
3643 let discovered_addr = "192.168.1.100:5000"
3645 .parse()
3646 .expect("Failed to parse test address");
3647 manager
3648 .accept_quic_discovered_address(peer_id, discovered_addr)
3649 .expect("Failed to accept address in test");
3650
3651 let status = manager
3653 .get_discovery_status(peer_id)
3654 .expect("Failed to get discovery status in test");
3655
3656 match &status.phase {
3658 DiscoveryPhase::ServerReflexiveQuerying { .. } => {
3659 panic!("Should not be in ServerReflexiveQuerying phase when using QUIC discovery");
3660 }
3661 _ => {} }
3663 }
3664
3665 #[test]
3666 fn test_no_bootstrap_queries_when_using_quic_discovery() {
3667 let mut manager = create_test_manager();
3668 let peer_id = PeerId([1; 32]);
3669
3670 manager
3672 .start_discovery(peer_id, vec![])
3673 .expect("Failed to start discovery in test");
3674
3675 let addr1 = "192.168.1.100:5000"
3677 .parse()
3678 .expect("Failed to parse test address");
3679 let addr2 = "8.8.8.8:5000"
3680 .parse()
3681 .expect("Failed to parse test address");
3682 manager
3683 .accept_quic_discovered_address(peer_id, addr1)
3684 .expect("Failed to accept address in test");
3685 manager
3686 .accept_quic_discovered_address(peer_id, addr2)
3687 .expect("Failed to accept address in test");
3688
3689 let status = manager
3691 .get_discovery_status(peer_id)
3692 .expect("Failed to get discovery status in test");
3693
3694 assert!(status.discovered_candidates.len() >= 2);
3696
3697 if let Some(session) = manager.active_sessions.get(&peer_id) {
3699 assert_eq!(
3701 session.statistics.bootstrap_queries_sent, 0,
3702 "Should not query bootstrap nodes when using QUIC discovery"
3703 );
3704 }
3705 }
3706
3707 #[test]
3708 fn test_priority_differences_quic_vs_placeholder() {
3709 let mut manager = create_test_manager();
3710 let peer_id = PeerId([1; 32]);
3711
3712 manager
3714 .start_discovery(peer_id, vec![])
3715 .expect("Failed to start discovery in test");
3716
3717 let discovered_addr = "8.8.8.8:5000"
3719 .parse()
3720 .expect("Failed to parse test address");
3721 manager
3722 .accept_quic_discovered_address(peer_id, discovered_addr)
3723 .expect("Failed to accept address in test");
3724
3725 if let Some(session) = manager.active_sessions.get(&peer_id) {
3727 let candidate = session
3728 .discovered_candidates
3729 .iter()
3730 .find(|c| c.address == discovered_addr)
3731 .expect("Should find the discovered address");
3732
3733 assert!(
3735 candidate.priority > 100,
3736 "QUIC-discovered address should have good priority"
3737 );
3738 assert!(candidate.priority < 300, "Priority should be reasonable");
3739
3740 assert!(matches!(
3742 candidate.source,
3743 DiscoverySourceType::ServerReflexive
3744 ));
3745 }
3746 }
3747
3748 #[test]
3749 fn test_quic_discovered_address_priority_calculation() {
3750 let mut manager = create_test_manager();
3752 let peer_id = PeerId([1; 32]);
3753
3754 manager
3756 .start_discovery(peer_id, vec![])
3757 .expect("Failed to start discovery in test");
3758
3759 let test_cases = vec![
3761 ("1.2.3.4:5678", (250, 260), "Public IPv4"),
3763 ("192.168.1.100:9000", (240, 250), "Private IPv4"),
3764 ("[2001:db8::1]:5678", (260, 280), "Global IPv6"),
3765 ("[fe80::1]:5678", (220, 240), "Link-local IPv6"),
3766 ("[fc00::1]:5678", (240, 260), "Unique local IPv6"),
3767 ("10.0.0.1:9000", (240, 250), "Private IPv4 (10.x)"),
3768 ("172.16.0.1:9000", (240, 250), "Private IPv4 (172.16.x)"),
3769 ];
3770
3771 for (addr_str, (min_priority, max_priority), description) in test_cases {
3772 let addr: SocketAddr = addr_str.parse().expect("Failed to parse test address");
3773 manager
3774 .accept_quic_discovered_address(peer_id, addr)
3775 .expect("Failed to accept address in test");
3776
3777 let session = manager
3778 .active_sessions
3779 .get(&peer_id)
3780 .expect("Session should exist in test");
3781 let candidate = session
3782 .discovered_candidates
3783 .iter()
3784 .find(|c| c.address == addr)
3785 .unwrap_or_else(|| panic!("No candidate found for {}", description));
3786
3787 assert!(
3788 candidate.priority >= min_priority && candidate.priority <= max_priority,
3789 "{} priority {} not in range [{}, {}]",
3790 description,
3791 candidate.priority,
3792 min_priority,
3793 max_priority
3794 );
3795 }
3796 }
3797
3798 #[test]
3799 fn test_quic_discovered_priority_factors() {
3800 let manager = create_test_manager();
3802
3803 let base_priority = manager.calculate_quic_discovered_priority(
3805 &"1.2.3.4:5678"
3806 .parse()
3807 .expect("Failed to parse test address"),
3808 );
3809 assert_eq!(
3810 base_priority, 255,
3811 "Base priority should be 255 for public IPv4"
3812 );
3813
3814 let ipv6_priority = manager.calculate_quic_discovered_priority(
3816 &"[2001:db8::1]:5678"
3817 .parse()
3818 .expect("Failed to parse test address"),
3819 );
3820 assert!(
3821 ipv6_priority > base_priority,
3822 "IPv6 should have higher priority than IPv4"
3823 );
3824
3825 let private_priority = manager.calculate_quic_discovered_priority(
3827 &"192.168.1.1:5678"
3828 .parse()
3829 .expect("Failed to parse test address"),
3830 );
3831 assert!(
3832 private_priority < base_priority,
3833 "Private addresses should have lower priority"
3834 );
3835
3836 let link_local_priority = manager.calculate_quic_discovered_priority(
3838 &"[fe80::1]:5678"
3839 .parse()
3840 .expect("Failed to parse test address"),
3841 );
3842 assert!(
3843 link_local_priority < private_priority,
3844 "Link-local should have lower priority than private"
3845 );
3846 }
3847
3848 #[test]
3849 fn test_quic_discovered_addresses_override_stale_server_reflexive() {
3850 let mut manager = create_test_manager();
3852 let peer_id = PeerId([1; 32]);
3853
3854 manager
3856 .start_discovery(peer_id, vec![])
3857 .expect("Failed to start discovery in test");
3858
3859 let session = manager
3861 .active_sessions
3862 .get_mut(&peer_id)
3863 .expect("Session should exist in test");
3864 let old_candidate = DiscoveryCandidate {
3865 address: "1.2.3.4:1234"
3866 .parse()
3867 .expect("Failed to parse test address"),
3868 priority: 200,
3869 source: DiscoverySourceType::ServerReflexive,
3870 state: CandidateState::Validating,
3871 };
3872 session.discovered_candidates.push(old_candidate);
3873
3874 let new_addr = "1.2.3.4:5678"
3876 .parse()
3877 .expect("Failed to parse test address");
3878 manager
3879 .accept_quic_discovered_address(peer_id, new_addr)
3880 .expect("Failed to accept address in test");
3881
3882 let session = manager
3884 .active_sessions
3885 .get(&peer_id)
3886 .expect("Session should exist in test");
3887 let candidates: Vec<_> = session
3888 .discovered_candidates
3889 .iter()
3890 .filter(|c| c.source == DiscoverySourceType::ServerReflexive)
3891 .collect();
3892
3893 assert_eq!(
3894 candidates.len(),
3895 2,
3896 "Should have both old and new candidates"
3897 );
3898
3899 let new_candidate = candidates
3901 .iter()
3902 .find(|c| c.address == new_addr)
3903 .expect("New candidate should be found");
3904 assert_ne!(
3905 new_candidate.priority, 200,
3906 "New candidate should have recalculated priority"
3907 );
3908 }
3909
3910 #[test]
3911 fn test_quic_discovered_address_generates_events() {
3912 let mut manager = create_test_manager();
3914 let peer_id = PeerId([1; 32]);
3915
3916 manager
3918 .start_discovery(peer_id, vec![])
3919 .expect("Failed to start discovery in test");
3920
3921 manager.poll_discovery_progress(peer_id);
3923
3924 let discovered_addr = "8.8.8.8:5000"
3926 .parse()
3927 .expect("Failed to parse test address");
3928 manager
3929 .accept_quic_discovered_address(peer_id, discovered_addr)
3930 .expect("Failed to accept address in test");
3931
3932 let events = manager.poll_discovery_progress(peer_id);
3934
3935 assert!(
3937 !events.is_empty(),
3938 "Should generate events for new QUIC-discovered address"
3939 );
3940
3941 let has_new_candidate = events.iter().any(|e| {
3943 matches!(e,
3944 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3945 if candidate.address == discovered_addr
3946 )
3947 });
3948 assert!(
3949 has_new_candidate,
3950 "Should generate ServerReflexiveCandidateDiscovered event for the discovered address"
3951 );
3952 }
3953
3954 #[test]
3955 fn test_multiple_quic_discovered_addresses_generate_events() {
3956 let mut manager = create_test_manager();
3958 let peer_id = PeerId([1; 32]);
3959
3960 manager
3962 .start_discovery(peer_id, vec![])
3963 .expect("Failed to start discovery in test");
3964
3965 manager.poll_discovery_progress(peer_id);
3967
3968 let addresses = vec![
3970 "8.8.8.8:5000"
3971 .parse()
3972 .expect("Failed to parse test address"),
3973 "1.1.1.1:6000"
3974 .parse()
3975 .expect("Failed to parse test address"),
3976 "[2001:db8::1]:7000"
3977 .parse()
3978 .expect("Failed to parse test address"),
3979 ];
3980
3981 for addr in &addresses {
3982 manager
3983 .accept_quic_discovered_address(peer_id, *addr)
3984 .expect("Failed to accept address in test");
3985 }
3986
3987 let events = manager.poll_discovery_progress(peer_id);
3989
3990 for addr in &addresses {
3992 let has_event = events.iter().any(|e| {
3993 matches!(e,
3994 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3995 if candidate.address == *addr
3996 )
3997 });
3998 assert!(has_event, "Should have event for address {addr}");
3999 }
4000 }
4001
4002 #[test]
4003 fn test_duplicate_quic_discovered_address_no_event() {
4004 let mut manager = create_test_manager();
4006 let peer_id = PeerId([1; 32]);
4007
4008 manager
4010 .start_discovery(peer_id, vec![])
4011 .expect("Failed to start discovery in test");
4012
4013 let discovered_addr = "8.8.8.8:5000"
4015 .parse()
4016 .expect("Failed to parse test address");
4017 manager
4018 .accept_quic_discovered_address(peer_id, discovered_addr)
4019 .expect("Failed to accept address in test");
4020
4021 manager.poll_discovery_progress(peer_id);
4023
4024 manager
4026 .accept_quic_discovered_address(peer_id, discovered_addr)
4027 .expect("Failed to accept address in test");
4028
4029 let events = manager.poll_discovery_progress(peer_id);
4031
4032 let has_duplicate_event = events.iter().any(|e| {
4034 matches!(e,
4035 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4036 if candidate.address == discovered_addr
4037 )
4038 });
4039
4040 assert!(
4041 !has_duplicate_event,
4042 "Should not generate event for duplicate address"
4043 );
4044 }
4045
4046 #[test]
4047 fn test_quic_discovered_address_event_timing() {
4048 let mut manager = create_test_manager();
4050 let peer_id = PeerId([1; 32]);
4051
4052 manager
4054 .start_discovery(peer_id, vec![])
4055 .expect("Failed to start discovery in test");
4056
4057 manager.poll_discovery_progress(peer_id);
4059
4060 let addr1 = "8.8.8.8:5000"
4062 .parse()
4063 .expect("Failed to parse test address");
4064 let addr2 = "1.1.1.1:6000"
4065 .parse()
4066 .expect("Failed to parse test address");
4067
4068 manager
4069 .accept_quic_discovered_address(peer_id, addr1)
4070 .expect("Failed to accept address in test");
4071 manager
4072 .accept_quic_discovered_address(peer_id, addr2)
4073 .expect("Failed to accept address in test");
4074
4075 let events = manager.poll_discovery_progress(peer_id);
4078
4079 let server_reflexive_count = events
4081 .iter()
4082 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4083 .count();
4084
4085 assert!(
4086 server_reflexive_count >= 2,
4087 "Should deliver all queued events on poll, got {server_reflexive_count} events"
4088 );
4089
4090 let events2 = manager.poll_discovery_progress(peer_id);
4092 let server_reflexive_count2 = events2
4093 .iter()
4094 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4095 .count();
4096 assert_eq!(
4097 server_reflexive_count2, 0,
4098 "Server reflexive events should not be duplicated on subsequent polls"
4099 );
4100 }
4101
4102 #[test]
4103 fn test_is_valid_local_address() {
4104 let manager = create_test_manager();
4105
4106 assert!(
4108 manager.is_valid_local_address(
4109 &"192.168.1.1:8080"
4110 .parse()
4111 .expect("Failed to parse test address")
4112 )
4113 );
4114 assert!(
4115 manager.is_valid_local_address(
4116 &"10.0.0.1:8080"
4117 .parse()
4118 .expect("Failed to parse test address")
4119 )
4120 );
4121 assert!(
4122 manager.is_valid_local_address(
4123 &"172.16.0.1:8080"
4124 .parse()
4125 .expect("Failed to parse test address")
4126 )
4127 );
4128
4129 assert!(
4131 manager.is_valid_local_address(
4132 &"[2001:4860:4860::8888]:8080"
4133 .parse()
4134 .expect("Failed to parse test address")
4135 )
4136 );
4137 assert!(
4138 manager.is_valid_local_address(
4139 &"[fe80::1]:8080"
4140 .parse()
4141 .expect("Failed to parse test address")
4142 )
4143 ); assert!(
4145 manager.is_valid_local_address(
4146 &"[fc00::1]:8080"
4147 .parse()
4148 .expect("Failed to parse test address")
4149 )
4150 ); assert!(
4154 !manager.is_valid_local_address(
4155 &"0.0.0.0:8080"
4156 .parse()
4157 .expect("Failed to parse test address")
4158 )
4159 );
4160 assert!(
4161 !manager.is_valid_local_address(
4162 &"255.255.255.255:8080"
4163 .parse()
4164 .expect("Failed to parse test address")
4165 )
4166 );
4167 assert!(
4168 !manager.is_valid_local_address(
4169 &"224.0.0.1:8080"
4170 .parse()
4171 .expect("Failed to parse test address")
4172 )
4173 ); assert!(
4175 !manager.is_valid_local_address(
4176 &"0.0.0.1:8080"
4177 .parse()
4178 .expect("Failed to parse test address")
4179 )
4180 ); assert!(
4182 !manager.is_valid_local_address(
4183 &"240.0.0.1:8080"
4184 .parse()
4185 .expect("Failed to parse test address")
4186 )
4187 ); assert!(
4189 !manager.is_valid_local_address(
4190 &"[::]:8080".parse().expect("Failed to parse test address")
4191 )
4192 ); assert!(
4194 !manager.is_valid_local_address(
4195 &"[ff02::1]:8080"
4196 .parse()
4197 .expect("Failed to parse test address")
4198 )
4199 ); assert!(
4201 !manager.is_valid_local_address(
4202 &"[2001:db8::1]:8080"
4203 .parse()
4204 .expect("Failed to parse test address")
4205 )
4206 ); assert!(
4210 !manager.is_valid_local_address(
4211 &"192.168.1.1:0"
4212 .parse()
4213 .expect("Failed to parse test address")
4214 )
4215 );
4216
4217 #[cfg(test)]
4219 {
4220 assert!(
4221 manager.is_valid_local_address(
4222 &"127.0.0.1:8080"
4223 .parse()
4224 .expect("Failed to parse test address")
4225 )
4226 );
4227 assert!(manager.is_valid_local_address(
4228 &"[::1]:8080".parse().expect("Failed to parse test address")
4229 ));
4230 }
4231 }
4232
4233 #[test]
4234 fn test_validation_rejects_invalid_addresses() {}
4235
4236 #[test]
4237 fn test_candidate_validation_error_types() {
4238 use crate::nat_traversal_api::{CandidateAddress, CandidateValidationError};
4239
4240 assert!(matches!(
4242 CandidateAddress::validate_address(&"192.168.1.1:0".parse().unwrap()),
4243 Err(CandidateValidationError::InvalidPort(0))
4244 ));
4245
4246 assert!(matches!(
4247 CandidateAddress::validate_address(&"0.0.0.0:8080".parse().unwrap()),
4248 Err(CandidateValidationError::UnspecifiedAddress)
4249 ));
4250
4251 assert!(matches!(
4252 CandidateAddress::validate_address(&"255.255.255.255:8080".parse().unwrap()),
4253 Err(CandidateValidationError::BroadcastAddress)
4254 ));
4255
4256 assert!(matches!(
4257 CandidateAddress::validate_address(&"224.0.0.1:8080".parse().unwrap()),
4258 Err(CandidateValidationError::MulticastAddress)
4259 ));
4260
4261 assert!(matches!(
4262 CandidateAddress::validate_address(&"240.0.0.1:8080".parse().unwrap()),
4263 Err(CandidateValidationError::ReservedAddress)
4264 ));
4265
4266 assert!(matches!(
4267 CandidateAddress::validate_address(&"[2001:db8::1]:8080".parse().unwrap()),
4268 Err(CandidateValidationError::DocumentationAddress)
4269 ));
4270
4271 assert!(matches!(
4272 CandidateAddress::validate_address(&"[::ffff:192.168.1.1]:8080".parse().unwrap()),
4273 Err(CandidateValidationError::IPv4MappedAddress)
4274 ));
4275 }
4276}