1#![allow(missing_docs)]
8
9use std::{
18 collections::{HashMap, VecDeque},
19 net::{IpAddr, SocketAddr},
20 sync::Arc,
21 time::{Duration, Instant},
22};
23
24use tracing::{debug, error, info, warn};
25
26use crate::Connection;
27
28use crate::{
29 connection::nat_traversal::{CandidateSource, CandidateState},
30 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
31};
32
33#[cfg(all(target_os = "windows", feature = "network-discovery"))]
35pub mod windows;
36
37#[cfg(all(target_os = "windows", feature = "network-discovery"))]
38pub use windows::WindowsInterfaceDiscovery;
39
40#[cfg(all(target_os = "linux", feature = "network-discovery"))]
41pub mod linux;
42
43#[cfg(all(target_os = "linux", feature = "network-discovery"))]
44pub use linux::LinuxInterfaceDiscovery;
45
46#[cfg(all(target_os = "macos", feature = "network-discovery"))]
47pub mod macos;
48
49#[cfg(all(target_os = "macos", feature = "network-discovery"))]
50pub use macos::MacOSInterfaceDiscovery;
51
52fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
54 match discovery_source {
55 DiscoverySourceType::Local => CandidateSource::Local,
56 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
57 DiscoverySourceType::Predicted => CandidateSource::Predicted,
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum DiscoverySourceType {
67 Local,
72
73 ServerReflexive,
78
79 Predicted,
84}
85
86#[derive(Debug, Clone)]
88pub(crate) struct DiscoveryCandidate {
89 pub address: SocketAddr,
90 pub priority: u32,
91 pub source: DiscoverySourceType,
92 pub state: CandidateState,
93}
94
95impl DiscoveryCandidate {
96 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
98 CandidateAddress {
99 address: self.address,
100 priority: self.priority,
101 source: convert_to_nat_source(self.source),
102 state: self.state,
103 }
104 }
105}
106
107#[derive(Debug)]
109
110pub struct DiscoverySession {
111 #[allow(dead_code)]
113 peer_id: PeerId,
114 #[allow(dead_code)]
116 session_id: u64,
117 current_phase: DiscoveryPhase,
119 started_at: Instant,
121 discovered_candidates: Vec<DiscoveryCandidate>,
123 statistics: DiscoveryStatistics,
125 #[allow(dead_code)]
127 allocation_history: VecDeque<PortAllocationEvent>,
128}
129
130pub struct CandidateDiscoveryManager {
132 config: DiscoveryConfig,
134 interface_discovery: Arc<std::sync::Mutex<Box<dyn NetworkInterfaceDiscovery + Send>>>,
136 #[allow(dead_code)]
140 cache: DiscoveryCache,
141 active_sessions: HashMap<PeerId, DiscoverySession>,
143 cached_local_candidates: Option<(Instant, Vec<ValidatedCandidate>)>,
145 #[allow(dead_code)]
147 local_cache_duration: Duration,
148 pending_validations: HashMap<CandidateId, PendingValidation>,
150}
151
152#[derive(Debug, Clone)]
154pub struct DiscoveryConfig {
155 pub total_timeout: Duration,
157 pub local_scan_timeout: Duration,
159 pub bootstrap_query_timeout: Duration,
161 pub max_query_retries: u32,
163 pub max_candidates: usize,
165 pub enable_symmetric_prediction: bool,
167 pub min_bootstrap_consensus: usize,
169 pub interface_cache_ttl: Duration,
171 pub server_reflexive_cache_ttl: Duration,
173 pub bound_address: Option<SocketAddr>,
175}
176
177#[derive(Debug, Clone, PartialEq)]
179#[allow(missing_docs)]
180pub enum DiscoveryPhase {
181 Idle,
183 LocalInterfaceScanning { started_at: Instant },
185 ServerReflexiveQuerying {
187 started_at: Instant,
188 active_queries: HashMap<BootstrapNodeId, QueryState>,
189 responses_received: Vec<ServerReflexiveResponse>,
190 },
191 CandidateValidation {
194 started_at: Instant,
195 validation_results: HashMap<CandidateId, ValidationResult>,
196 },
197 Completed {
199 final_candidates: Vec<ValidatedCandidate>,
200 completion_time: Instant,
201 },
202 Failed {
204 error: DiscoveryError,
206 failed_at: Instant,
208 fallback_options: Vec<FallbackStrategy>,
210 },
211}
212
213#[derive(Debug, Clone)]
215pub enum DiscoveryEvent {
216 DiscoveryStarted {
218 peer_id: PeerId,
219 bootstrap_count: usize,
220 },
221 LocalScanningStarted,
223 LocalCandidateDiscovered { candidate: CandidateAddress },
225 LocalScanningCompleted {
227 candidate_count: usize,
228 duration: Duration,
229 },
230 ServerReflexiveDiscoveryStarted { bootstrap_count: usize },
232 ServerReflexiveCandidateDiscovered {
234 candidate: CandidateAddress,
235 bootstrap_node: SocketAddr,
236 },
237 BootstrapQueryFailed {
239 bootstrap_node: SocketAddr,
241 error: String,
243 },
244 PortAllocationDetected {
247 port: u16,
248 source_address: SocketAddr,
249 bootstrap_node: BootstrapNodeId,
250 timestamp: Instant,
251 },
252 DiscoveryCompleted {
254 candidate_count: usize,
255 total_duration: Duration,
256 success_rate: f64,
257 },
258 DiscoveryFailed {
260 error: DiscoveryError,
262 partial_results: Vec<CandidateAddress>,
264 },
265 PathValidationRequested {
267 candidate_id: CandidateId,
268 candidate_address: SocketAddr,
269 challenge_token: u64,
270 },
271 PathValidationResponse {
273 candidate_id: CandidateId,
274 candidate_address: SocketAddr,
275 challenge_token: u64,
276 rtt: Duration,
277 },
278}
279
280#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
282pub struct BootstrapNodeId(pub u64);
283
284struct PendingValidation {
286 candidate_address: SocketAddr,
288 challenge_token: u64,
290 started_at: Instant,
292 #[allow(dead_code)]
294 attempts: u32,
295}
296
297#[derive(Debug, Clone, PartialEq, Eq)]
299pub enum QueryState {
300 Pending { sent_at: Instant, attempts: u32 },
302 Completed,
304 Failed,
306}
307
308#[derive(Debug, Clone, PartialEq)]
310pub struct ServerReflexiveResponse {
311 pub bootstrap_node: BootstrapNodeId,
312 pub observed_address: SocketAddr,
313 pub response_time: Duration,
314 pub timestamp: Instant,
315}
316
317#[derive(Debug, Clone, PartialEq)]
321pub struct PortAllocationEvent {
322 pub port: u16,
323 pub timestamp: Instant,
324 pub source_address: SocketAddr,
325}
326
327#[derive(Debug, Clone, PartialEq)]
329pub struct PortAllocationPattern {
330 pub pattern_type: AllocationPatternType,
331 pub base_port: u16,
332 pub stride: u16,
333 pub pool_boundaries: Option<(u16, u16)>,
334 pub confidence: f64,
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
339pub enum AllocationPatternType {
340 Sequential,
342 FixedStride,
344 Random,
346 PoolBased,
348 TimeBased,
350 Unknown,
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
358pub struct CandidateId(pub u64);
359
360#[derive(Debug, Clone, PartialEq)]
362pub enum ValidationResult {
363 Valid { rtt: Duration },
364 Invalid { reason: String },
365 Timeout,
366 Pending,
367}
368
369#[derive(Debug, Clone, PartialEq)]
371pub struct ValidatedCandidate {
372 pub id: CandidateId,
373 pub address: SocketAddr,
374 pub source: DiscoverySourceType,
375 pub priority: u32,
376 pub rtt: Option<Duration>,
377 pub reliability_score: f64,
378}
379
380impl ValidatedCandidate {
381 pub fn to_candidate_address(&self) -> CandidateAddress {
383 CandidateAddress {
384 address: self.address,
385 priority: self.priority,
386 source: convert_to_nat_source(self.source),
387 state: CandidateState::Valid,
388 }
389 }
390}
391
392#[derive(Debug)]
394#[allow(dead_code)]
395pub(crate) struct DiscoverySessionState {
396 pub peer_id: PeerId,
397 pub session_id: u64,
398 pub started_at: Instant,
399 pub discovered_candidates: Vec<DiscoveryCandidate>,
400 pub statistics: DiscoveryStatistics,
401 pub allocation_history: VecDeque<PortAllocationEvent>,
402}
403
404#[derive(Debug, Default, Clone)]
406pub struct DiscoveryStatistics {
407 pub local_candidates_found: u32,
408 pub server_reflexive_candidates_found: u32,
409 pub predicted_candidates_generated: u32,
410 pub bootstrap_queries_sent: u32,
411 pub bootstrap_queries_successful: u32,
412 pub total_discovery_time: Option<Duration>,
413 pub average_bootstrap_rtt: Option<Duration>,
414 pub invalid_addresses_rejected: u32,
415}
416
417#[derive(Debug, Clone, PartialEq, Eq)]
422pub enum DiscoveryError {
423 NoLocalInterfaces,
428
429 AllBootstrapsFailed,
434
435 DiscoveryTimeout,
440
441 InsufficientCandidates {
446 found: usize,
448 required: usize,
450 },
451
452 NetworkError(String),
457 ConfigurationError(String),
459 InternalError(String),
461}
462
463#[derive(Debug, Clone, PartialEq, Eq)]
465pub enum FallbackStrategy {
466 UseCachedResults,
468 RetryWithRelaxedParams,
470 UseMinimalCandidates,
472 EnableRelayFallback,
474}
475
476impl Default for DiscoveryConfig {
477 fn default() -> Self {
478 Self {
479 total_timeout: Duration::from_secs(30),
480 local_scan_timeout: Duration::from_secs(2),
481 bootstrap_query_timeout: Duration::from_secs(5),
482 max_query_retries: 3,
483 max_candidates: 8,
484 enable_symmetric_prediction: true,
485 min_bootstrap_consensus: 2,
486 interface_cache_ttl: Duration::from_secs(60),
487 server_reflexive_cache_ttl: Duration::from_secs(300),
488 bound_address: None,
489 }
490 }
491}
492
493impl DiscoverySession {
494 fn new(peer_id: PeerId, _config: &DiscoveryConfig) -> Self {
496 Self {
497 peer_id,
498 session_id: rand::random(),
499 current_phase: DiscoveryPhase::Idle,
500 started_at: Instant::now(),
501 discovered_candidates: Vec::new(),
502 statistics: DiscoveryStatistics::default(),
503 allocation_history: VecDeque::new(),
504 }
505 }
506}
507
508impl CandidateDiscoveryManager {
509 pub fn new(config: DiscoveryConfig) -> Self {
511 let interface_discovery =
512 Arc::new(std::sync::Mutex::new(create_platform_interface_discovery()));
513 let cache = DiscoveryCache::new(&config);
514 let local_cache_duration = config.interface_cache_ttl;
515
516 Self {
517 config,
518 interface_discovery,
519 cache,
520 active_sessions: HashMap::new(),
521 cached_local_candidates: None,
522 local_cache_duration,
523 pending_validations: HashMap::new(),
524 }
525 }
526
527 pub fn set_bound_address(&mut self, address: SocketAddr) {
529 self.config.bound_address = Some(address);
530 self.cached_local_candidates = None;
532 }
533
534 #[allow(clippy::panic)]
536 pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
537 let mut interface_discovery = self
539 .interface_discovery
540 .lock()
541 .map_err(|e| DiscoveryError::NetworkError(format!("Mutex poisoned: {e}")))?;
542 interface_discovery.start_scan().map_err(|e| {
543 DiscoveryError::NetworkError(format!("Failed to start interface scan: {e}"))
544 })?;
545
546 let start = Instant::now();
548 let timeout = Duration::from_secs(2);
549
550 loop {
551 if start.elapsed() > timeout {
552 return Err(DiscoveryError::DiscoveryTimeout);
553 }
554
555 if let Some(interfaces) = self
556 .interface_discovery
557 .lock()
558 .unwrap_or_else(|_| panic!("interface discovery mutex should be valid"))
559 .check_scan_complete()
560 {
561 let mut candidates = Vec::new();
563
564 for interface in interfaces {
565 for addr in interface.addresses {
566 candidates.push(ValidatedCandidate {
567 id: CandidateId(rand::random()),
568 address: addr,
569 source: DiscoverySourceType::Local,
570 priority: 50000, rtt: None,
572 reliability_score: 1.0,
573 });
574 }
575 }
576
577 if candidates.is_empty() {
578 return Err(DiscoveryError::NoLocalInterfaces);
579 }
580
581 return Ok(candidates);
582 }
583
584 std::thread::sleep(Duration::from_millis(10));
586 }
587 }
588
589 pub fn start_discovery(
591 &mut self,
592 peer_id: PeerId,
593 _bootstrap_nodes: Vec<BootstrapNode>,
594 ) -> Result<(), DiscoveryError> {
595 if self.active_sessions.contains_key(&peer_id) {
597 return Err(DiscoveryError::InternalError(format!(
598 "Discovery already in progress for peer {peer_id:?}"
599 )));
600 }
601
602 info!("Starting candidate discovery for peer {:?}", peer_id);
603
604 let mut session = DiscoverySession::new(peer_id, &self.config);
606
607 session.current_phase = DiscoveryPhase::LocalInterfaceScanning {
609 started_at: Instant::now(),
610 };
611
612 self.active_sessions.insert(peer_id, session);
614
615 Ok(())
616 }
617
618 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
620 let mut all_events = Vec::new();
621 let mut completed_sessions = Vec::new();
622
623 let mut local_scan_events = Vec::new();
626 for (peer_id, session) in &mut self.active_sessions {
627 if let DiscoveryPhase::LocalInterfaceScanning { started_at } = &session.current_phase {
628 if started_at.elapsed() > self.config.local_scan_timeout {
630 local_scan_events.push((
631 *peer_id,
632 DiscoveryEvent::LocalScanningCompleted {
633 candidate_count: 0,
634 duration: started_at.elapsed(),
635 },
636 ));
637 }
638 }
639 }
640
641 for (peer_id, event) in local_scan_events {
643 all_events.push(event);
644 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
645 session.current_phase = DiscoveryPhase::Completed {
647 final_candidates: session
648 .discovered_candidates
649 .iter()
650 .map(|dc| ValidatedCandidate {
651 id: CandidateId(0),
652 address: dc.address,
653 source: dc.source,
654 priority: dc.priority,
655 rtt: None,
656 reliability_score: 1.0,
657 })
658 .collect(),
659 completion_time: now,
660 };
661
662 all_events.push(DiscoveryEvent::DiscoveryCompleted {
663 candidate_count: session.discovered_candidates.len(),
664 total_duration: now.duration_since(session.started_at),
665 success_rate: 1.0,
666 });
667
668 completed_sessions.push(peer_id);
669 }
670 }
671
672 for peer_id in completed_sessions {
674 self.active_sessions.remove(&peer_id);
675 debug!("Removed completed discovery session for peer {:?}", peer_id);
676 }
677
678 all_events
679 }
680
681 pub fn get_status(&self) -> DiscoveryStatus {
683 DiscoveryStatus {
685 phase: DiscoveryPhase::Idle,
686 discovered_candidates: Vec::new(),
687 statistics: DiscoveryStatistics::default(),
688 elapsed_time: Duration::from_secs(0),
689 }
690 }
691
692 pub fn is_complete(&self) -> bool {
694 self.active_sessions.values().all(|session| {
696 matches!(
697 session.current_phase,
698 DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. }
699 )
700 })
701 }
702
703 pub fn get_results(&self) -> Option<DiscoveryResults> {
705 if self.active_sessions.is_empty() {
707 return None;
708 }
709
710 let mut all_candidates = Vec::new();
712 let mut latest_completion = Instant::now();
713 let mut combined_stats = DiscoveryStatistics::default();
714
715 for session in self.active_sessions.values() {
716 match &session.current_phase {
717 DiscoveryPhase::Completed {
718 final_candidates,
719 completion_time,
720 } => {
721 all_candidates.extend(final_candidates.clone());
723 latest_completion = *completion_time;
724 combined_stats.local_candidates_found +=
726 session.statistics.local_candidates_found;
727 combined_stats.server_reflexive_candidates_found +=
728 session.statistics.server_reflexive_candidates_found;
729 combined_stats.predicted_candidates_generated +=
730 session.statistics.predicted_candidates_generated;
731 combined_stats.bootstrap_queries_sent +=
732 session.statistics.bootstrap_queries_sent;
733 combined_stats.bootstrap_queries_successful +=
734 session.statistics.bootstrap_queries_successful;
735 }
736 DiscoveryPhase::Failed { .. } => {
737 let validated: Vec<ValidatedCandidate> = session
740 .discovered_candidates
741 .iter()
742 .enumerate()
743 .map(|(idx, dc)| ValidatedCandidate {
744 id: CandidateId(idx as u64),
745 address: dc.address,
746 source: dc.source,
747 priority: dc.priority,
748 rtt: None,
749 reliability_score: 0.5, })
751 .collect();
752 all_candidates.extend(validated);
753 }
754 _ => {}
755 }
756 }
757
758 if all_candidates.is_empty() {
759 None
760 } else {
761 Some(DiscoveryResults {
762 candidates: all_candidates,
763 completion_time: latest_completion,
764 statistics: combined_stats,
765 })
766 }
767 }
768
769 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
771 if let Some(session) = self.active_sessions.get(&peer_id) {
773 session
775 .discovered_candidates
776 .iter()
777 .map(|c| c.to_candidate_address())
778 .collect()
779 } else {
780 debug!("No active discovery session found for peer {:?}", peer_id);
782 Vec::new()
783 }
784 }
785
786 #[allow(dead_code)]
789 fn poll_session_local_scanning(
790 &mut self,
791 session: &mut DiscoverySession,
792 started_at: Instant,
793 now: Instant,
794 events: &mut Vec<DiscoveryEvent>,
795 ) {
796 if let Some((cache_time, ref cached_candidates)) = self.cached_local_candidates {
798 if cache_time.elapsed() < self.local_cache_duration {
799 debug!(
801 "Using cached local candidates for peer {:?}",
802 session.peer_id
803 );
804 self.process_cached_local_candidates(
805 session,
806 cached_candidates.clone(),
807 events,
808 now,
809 );
810 return;
811 }
812 }
813
814 if started_at.elapsed().as_millis() < 10 {
817 let scan_result = match self.interface_discovery.lock() {
818 Ok(mut interface_discovery) => interface_discovery.start_scan(),
819 Err(e) => {
820 error!("Interface discovery mutex poisoned: {}", e);
821 return;
822 }
823 };
824 match scan_result {
825 Ok(()) => {
826 debug!(
827 "Started local interface scan for peer {:?}",
828 session.peer_id
829 );
830 events.push(DiscoveryEvent::LocalScanningStarted);
831 }
832 Err(e) => {
833 error!("Failed to start interface scan: {}", e);
834 return;
835 }
836 }
837 }
838
839 if started_at.elapsed() > self.config.local_scan_timeout {
841 warn!(
842 "Local interface scanning timeout for peer {:?}",
843 session.peer_id
844 );
845 self.handle_session_local_scan_timeout(session, events, now);
846 return;
847 }
848
849 let scan_complete_result = self
851 .interface_discovery
852 .lock()
853 .unwrap_or_else(|_| panic!("interface discovery mutex should be valid"))
854 .check_scan_complete();
855 if let Some(interfaces) = scan_complete_result {
856 self.process_session_local_interfaces(session, interfaces, events, now);
857 }
858 }
859
860 #[allow(dead_code)]
861 fn process_session_local_interfaces(
862 &mut self,
863 session: &mut DiscoverySession,
864 interfaces: Vec<NetworkInterface>,
865 events: &mut Vec<DiscoveryEvent>,
866 now: Instant,
867 ) {
868 debug!(
869 "Processing {} network interfaces for peer {:?}",
870 interfaces.len(),
871 session.peer_id
872 );
873
874 let mut validated_candidates = Vec::new();
875
876 if let Some(bound_addr) = self.config.bound_address {
878 if self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback() {
879 let candidate = DiscoveryCandidate {
880 address: bound_addr,
881 priority: 60000, source: DiscoverySourceType::Local,
883 state: CandidateState::New,
884 };
885
886 session.discovered_candidates.push(candidate.clone());
887 session.statistics.local_candidates_found += 1;
888
889 validated_candidates.push(ValidatedCandidate {
891 id: CandidateId(rand::random()),
892 address: bound_addr,
893 source: DiscoverySourceType::Local,
894 priority: candidate.priority,
895 rtt: None,
896 reliability_score: 1.0,
897 });
898
899 events.push(DiscoveryEvent::LocalCandidateDiscovered {
900 candidate: candidate.to_candidate_address(),
901 });
902
903 debug!(
904 "Added bound address {} as local candidate for peer {:?}",
905 bound_addr, session.peer_id
906 );
907 }
908 }
909
910 for interface in &interfaces {
912 for address in &interface.addresses {
913 if Some(*address) == self.config.bound_address {
915 continue;
916 }
917
918 if self.is_valid_local_address(address) {
919 let candidate = DiscoveryCandidate {
920 address: *address,
921 priority: self.calculate_local_priority(address, interface),
922 source: DiscoverySourceType::Local,
923 state: CandidateState::New,
924 };
925
926 session.discovered_candidates.push(candidate.clone());
927 session.statistics.local_candidates_found += 1;
928
929 validated_candidates.push(ValidatedCandidate {
931 id: CandidateId(rand::random()),
932 address: *address,
933 source: DiscoverySourceType::Local,
934 priority: candidate.priority,
935 rtt: None,
936 reliability_score: 1.0,
937 });
938
939 events.push(DiscoveryEvent::LocalCandidateDiscovered {
940 candidate: candidate.to_candidate_address(),
941 });
942 }
943 }
944 }
945
946 self.cached_local_candidates = Some((now, validated_candidates));
948
949 events.push(DiscoveryEvent::LocalScanningCompleted {
950 candidate_count: session.statistics.local_candidates_found as usize,
951 duration: now.duration_since(session.started_at),
952 });
953
954 self.complete_session_discovery_with_local_candidates(session, events, now);
956 }
957
958 #[allow(dead_code)]
959 fn process_cached_local_candidates(
960 &mut self,
961 session: &mut DiscoverySession,
962 mut cached_candidates: Vec<ValidatedCandidate>,
963 events: &mut Vec<DiscoveryEvent>,
964 now: Instant,
965 ) {
966 if let Some(bound_addr) = self.config.bound_address {
968 let has_bound_addr = cached_candidates.iter().any(|c| c.address == bound_addr);
969 if !has_bound_addr
970 && (self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback())
971 {
972 cached_candidates.insert(
973 0,
974 ValidatedCandidate {
975 id: CandidateId(rand::random()),
976 address: bound_addr,
977 source: DiscoverySourceType::Local,
978 priority: 60000, rtt: None,
980 reliability_score: 1.0,
981 },
982 );
983 }
984 }
985
986 debug!(
987 "Using {} cached local candidates for peer {:?}",
988 cached_candidates.len(),
989 session.peer_id
990 );
991
992 for validated in cached_candidates {
993 let candidate = DiscoveryCandidate {
994 address: validated.address,
995 priority: validated.priority,
996 source: validated.source,
997 state: CandidateState::New,
998 };
999
1000 session.discovered_candidates.push(candidate.clone());
1001 session.statistics.local_candidates_found += 1;
1002
1003 events.push(DiscoveryEvent::LocalCandidateDiscovered {
1004 candidate: candidate.to_candidate_address(),
1005 });
1006 }
1007
1008 events.push(DiscoveryEvent::LocalScanningCompleted {
1009 candidate_count: session.statistics.local_candidates_found as usize,
1010 duration: now.duration_since(session.started_at),
1011 });
1012
1013 self.complete_session_discovery_with_local_candidates(session, events, now);
1015 }
1016
1017 #[allow(dead_code)]
1024 fn start_session_candidate_validation(
1025 &mut self,
1026 session: &mut DiscoverySession,
1027 _events: &mut Vec<DiscoveryEvent>,
1028 now: Instant,
1029 ) {
1030 debug!(
1031 "Starting candidate validation for {} candidates",
1032 session.discovered_candidates.len()
1033 );
1034
1035 session.current_phase = DiscoveryPhase::CandidateValidation {
1036 started_at: now,
1037 validation_results: HashMap::new(),
1038 };
1039 }
1040
1041 #[allow(dead_code)]
1043 fn start_path_validation(
1044 &mut self,
1045 candidate_id: CandidateId,
1046 candidate_address: SocketAddr,
1047 now: Instant,
1048 events: &mut Vec<DiscoveryEvent>,
1049 ) {
1050 debug!(
1051 "Starting QUIC path validation for candidate {} at {}",
1052 candidate_id.0, candidate_address
1053 );
1054
1055 let challenge_token: u64 = rand::random();
1057
1058 self.pending_validations.insert(
1060 candidate_id,
1061 PendingValidation {
1062 candidate_address,
1063 challenge_token,
1064 started_at: now,
1065 attempts: 1,
1066 },
1067 );
1068
1069 events.push(DiscoveryEvent::PathValidationRequested {
1071 candidate_id,
1072 candidate_address,
1073 challenge_token,
1074 });
1075
1076 debug!(
1077 "PATH_CHALLENGE {:08x} requested for candidate {} at {}",
1078 challenge_token, candidate_id.0, candidate_address
1079 );
1080 }
1081
1082 pub fn handle_path_response(
1084 &mut self,
1085 candidate_address: SocketAddr,
1086 challenge_token: u64,
1087 now: Instant,
1088 ) -> Option<DiscoveryEvent> {
1089 let candidate_id = self
1091 .pending_validations
1092 .iter()
1093 .find(|(_, validation)| {
1094 validation.candidate_address == candidate_address
1095 && validation.challenge_token == challenge_token
1096 })
1097 .map(|(id, _)| *id)?;
1098
1099 let validation = self.pending_validations.remove(&candidate_id)?;
1101 let rtt = now.duration_since(validation.started_at);
1102
1103 debug!(
1104 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1105 candidate_id.0, candidate_address, rtt
1106 );
1107
1108 for session in self.active_sessions.values_mut() {
1110 if let Some(candidate) = session
1111 .discovered_candidates
1112 .iter_mut()
1113 .find(|c| c.address == candidate_address)
1114 {
1115 candidate.state = CandidateState::Valid;
1116 break;
1118 }
1119 }
1120
1121 Some(DiscoveryEvent::PathValidationResponse {
1122 candidate_id,
1123 candidate_address,
1124 challenge_token,
1125 rtt,
1126 })
1127 }
1128
1129 #[allow(dead_code)]
1131 fn simulate_path_validation(
1132 &mut self,
1133 candidate_id: CandidateId,
1134 candidate_address: SocketAddr,
1135 _now: Instant,
1136 ) {
1137 let is_local = candidate_address.ip().is_loopback()
1139 || (candidate_address.ip().is_ipv4()
1140 && candidate_address.ip().to_string().starts_with("192.168."))
1141 || (candidate_address.ip().is_ipv4()
1142 && candidate_address.ip().to_string().starts_with("10."))
1143 || (candidate_address.ip().is_ipv4()
1144 && candidate_address.ip().to_string().starts_with("172."));
1145
1146 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
1147
1148 debug!(
1151 "Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
1152 candidate_id.0, candidate_address, is_local, is_server_reflexive
1153 );
1154 }
1155
1156 #[allow(dead_code)]
1158 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
1159 let is_local = address.ip().is_loopback()
1160 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168."))
1161 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("10."))
1162 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
1163
1164 if is_local {
1165 ValidationResult::Valid {
1167 rtt: Duration::from_millis(1),
1168 }
1169 } else if address.ip().is_unspecified() {
1170 ValidationResult::Invalid {
1172 reason: "Unspecified address".to_string(),
1173 }
1174 } else {
1175 ValidationResult::Valid {
1177 rtt: Duration::from_millis(50 + (address.port() % 100) as u64),
1178 }
1179 }
1180 }
1181
1182 #[allow(dead_code)]
1184 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1185 let mut score: f64 = 0.5; match candidate.source {
1189 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
1193
1194 let rtt_ms = rtt.as_millis() as f64;
1196 if rtt_ms < 10.0 {
1197 score += 0.2;
1198 } else if rtt_ms < 50.0 {
1199 score += 0.1;
1200 } else if rtt_ms > 200.0 {
1201 score -= 0.1;
1202 }
1203
1204 if candidate.address.ip().is_ipv6() {
1206 score += 0.05; }
1208
1209 score.max(0.0).min(1.0)
1211 }
1212
1213 #[allow(dead_code)]
1216 fn handle_session_timeout(
1217 &mut self,
1218 session: &mut DiscoverySession,
1219 events: &mut Vec<DiscoveryEvent>,
1220 now: Instant,
1221 ) {
1222 let error = DiscoveryError::DiscoveryTimeout;
1223 let partial_results = session
1224 .discovered_candidates
1225 .iter()
1226 .map(|c| c.to_candidate_address())
1227 .collect();
1228
1229 warn!(
1230 "Discovery failed for peer {:?}: discovery process timed out (found {} partial candidates)",
1231 session.peer_id,
1232 session.discovered_candidates.len()
1233 );
1234 events.push(DiscoveryEvent::DiscoveryFailed {
1235 error: error.clone(),
1236 partial_results,
1237 });
1238
1239 session.current_phase = DiscoveryPhase::Failed {
1240 error,
1241 failed_at: now,
1242 fallback_options: vec![FallbackStrategy::UseCachedResults],
1243 };
1244 }
1245
1246 #[allow(dead_code)]
1247 fn handle_session_local_scan_timeout(
1248 &mut self,
1249 session: &mut DiscoverySession,
1250 events: &mut Vec<DiscoveryEvent>,
1251 now: Instant,
1252 ) {
1253 warn!(
1254 "Local interface scan timeout for peer {:?}, proceeding with available candidates",
1255 session.peer_id
1256 );
1257
1258 events.push(DiscoveryEvent::LocalScanningCompleted {
1259 candidate_count: session.statistics.local_candidates_found as usize,
1260 duration: now.duration_since(session.started_at),
1261 });
1262
1263 self.complete_session_discovery_with_local_candidates(session, events, now);
1265 }
1266
1267 #[allow(dead_code)]
1272 fn poll_session_candidate_validation(
1273 &mut self,
1274 session: &mut DiscoverySession,
1275 _started_at: Instant,
1276 _validation_results: &HashMap<CandidateId, ValidationResult>,
1277 now: Instant,
1278 events: &mut Vec<DiscoveryEvent>,
1279 ) {
1280 self.complete_session_discovery_with_local_candidates(session, events, now);
1283 }
1284
1285 #[allow(dead_code)]
1286 fn complete_session_discovery_with_local_candidates(
1287 &mut self,
1288 session: &mut DiscoverySession,
1289 events: &mut Vec<DiscoveryEvent>,
1290 now: Instant,
1291 ) {
1292 let duration = now.duration_since(session.started_at);
1294 session.statistics.total_discovery_time = Some(duration);
1295
1296 let success_rate = if session.statistics.local_candidates_found > 0 {
1297 1.0
1298 } else {
1299 0.0
1300 };
1301
1302 let validated_candidates: Vec<ValidatedCandidate> = session
1304 .discovered_candidates
1305 .iter()
1306 .map(|dc| ValidatedCandidate {
1307 id: CandidateId(rand::random()),
1308 address: dc.address,
1309 source: dc.source,
1310 priority: dc.priority,
1311 rtt: None,
1312 reliability_score: 1.0,
1313 })
1314 .collect();
1315
1316 events.push(DiscoveryEvent::DiscoveryCompleted {
1317 candidate_count: validated_candidates.len(),
1318 total_duration: duration,
1319 success_rate,
1320 });
1321
1322 session.current_phase = DiscoveryPhase::Completed {
1323 final_candidates: validated_candidates,
1324 completion_time: now,
1325 };
1326
1327 info!(
1328 "Discovery completed with {} local candidates for peer {:?}",
1329 session.discovered_candidates.len(),
1330 session.peer_id
1331 );
1332 }
1333
1334 #[allow(dead_code)]
1335 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1336 use crate::nat_traversal_api::CandidateAddress;
1338
1339 if let Err(e) = CandidateAddress::validate_address(address) {
1340 debug!("Address {} failed validation: {}", address, e);
1341 return false;
1342 }
1343
1344 match address.ip() {
1345 IpAddr::V4(ipv4) => {
1346 #[cfg(test)]
1348 if ipv4.is_loopback() {
1349 return true;
1350 }
1351 !ipv4.is_loopback()
1354 && !ipv4.is_unspecified()
1355 && !ipv4.is_broadcast()
1356 && !ipv4.is_multicast()
1357 && !ipv4.is_documentation()
1358 }
1359 IpAddr::V6(ipv6) => {
1360 #[cfg(test)]
1362 if ipv6.is_loopback() {
1363 return true;
1364 }
1365 let segments = ipv6.segments();
1367 let is_documentation = segments[0] == 0x2001 && segments[1] == 0x0db8;
1368
1369 !ipv6.is_loopback()
1370 && !ipv6.is_unspecified()
1371 && !ipv6.is_multicast()
1372 && !is_documentation
1373 }
1374 }
1375 }
1376
1377 #[allow(dead_code)]
1380 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1381 let mut priority = 100; match address.ip() {
1384 IpAddr::V4(ipv4) => {
1385 if ipv4.is_private() {
1386 priority += 50; }
1388 }
1389 IpAddr::V6(ipv6) => {
1390 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1393 let segments = ipv6.segments();
1394 if segments[0] & 0xE000 == 0x2000 {
1395 priority += 60;
1397 } else if segments[0] & 0xFFC0 == 0xFE80 {
1398 priority += 20;
1400 } else if segments[0] & 0xFE00 == 0xFC00 {
1401 priority += 40;
1403 } else {
1404 priority += 30;
1406 }
1407 }
1408
1409 priority += 10; }
1412 }
1413
1414 if interface.is_wireless {
1415 priority -= 10; }
1417
1418 priority
1419 }
1420
1421 #[allow(dead_code)]
1422 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1423 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1427 priority += 20;
1428 } else if response.response_time > Duration::from_millis(200) {
1429 priority -= 10;
1430 }
1431
1432 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 {
1434 20
1435 } else {
1436 0
1437 };
1438 priority += age_bonus;
1439
1440 priority
1441 }
1442
1443 #[allow(dead_code)]
1444 fn should_transition_to_prediction(
1445 &self,
1446 responses: &[ServerReflexiveResponse],
1447 _now: Instant,
1448 ) -> bool {
1449 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1450 }
1451
1452 #[allow(dead_code)]
1453 #[allow(clippy::panic)]
1454 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1455 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1457
1458 for response in responses {
1459 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1460 }
1461
1462 address_counts
1463 .into_iter()
1464 .max_by_key(|(_, count)| *count)
1465 .map(|(addr, _)| addr)
1466 .unwrap_or_else(|| {
1467 "0.0.0.0:0"
1468 .parse()
1469 .unwrap_or_else(|_| panic!("hardcoded fallback address should be valid"))
1470 })
1471 }
1472
1473 #[allow(dead_code)]
1475 fn calculate_prediction_accuracy(
1476 &self,
1477 pattern: &PortAllocationPattern,
1478 history: &VecDeque<PortAllocationEvent>,
1479 ) -> f64 {
1480 if history.len() < 3 {
1481 return 0.3; }
1483
1484 let recent_ports: Vec<u16> = history
1486 .iter()
1487 .rev()
1488 .take(10)
1489 .map(|event| event.port)
1490 .collect();
1491
1492 let mut correct_predictions = 0;
1493 let total_predictions = recent_ports.len().saturating_sub(1);
1494
1495 if total_predictions == 0 {
1496 return 0.3;
1497 }
1498
1499 match pattern.pattern_type {
1500 AllocationPatternType::Sequential => {
1501 for i in 1..recent_ports.len() {
1503 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == 1 {
1504 correct_predictions += 1;
1505 }
1506 }
1507 }
1508 AllocationPatternType::FixedStride => {
1509 for i in 1..recent_ports.len() {
1511 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == pattern.stride {
1512 correct_predictions += 1;
1513 }
1514 }
1515 }
1516 AllocationPatternType::PoolBased => {
1517 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1519 for port in &recent_ports {
1520 if *port >= min_port && *port <= max_port {
1521 correct_predictions += 1;
1522 }
1523 }
1524 }
1525 }
1526 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1527 if recent_ports.len() >= 3 {
1529 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>()
1530 / recent_ports.len() as f64;
1531 let variance = recent_ports
1532 .iter()
1533 .map(|&p| (p as f64 - mean).powi(2))
1534 .sum::<f64>()
1535 / recent_ports.len() as f64;
1536
1537 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1541 }
1542 AllocationPatternType::TimeBased => {
1543 if history.len() >= 2 {
1545 let time_diffs: Vec<Duration> = history
1546 .iter()
1547 .collect::<Vec<_>>()
1548 .windows(2)
1549 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1550 .collect();
1551
1552 if !time_diffs.is_empty() {
1553 let avg_diff =
1554 time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1555 let variance = time_diffs
1556 .iter()
1557 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1558 .sum::<f64>()
1559 / time_diffs.len() as f64;
1560
1561 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1565 }
1566 }
1567 }
1568
1569 let accuracy = if total_predictions > 0 {
1571 correct_predictions as f64 / total_predictions as f64
1572 } else {
1573 0.3
1574 };
1575
1576 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1578
1579 confidence_adjusted_accuracy.max(0.2).min(0.9)
1581 }
1582
1583 pub fn accept_quic_discovered_address(
1586 &mut self,
1587 peer_id: PeerId,
1588 discovered_address: SocketAddr,
1589 ) -> Result<(), DiscoveryError> {
1590 let priority = self.calculate_quic_discovered_priority(&discovered_address);
1592
1593 let session = self.active_sessions.get_mut(&peer_id).ok_or_else(|| {
1595 DiscoveryError::InternalError(format!(
1596 "No active discovery session for peer {peer_id:?}"
1597 ))
1598 })?;
1599
1600 let already_exists = session
1602 .discovered_candidates
1603 .iter()
1604 .any(|c| c.address == discovered_address);
1605
1606 if already_exists {
1607 debug!(
1608 "QUIC-discovered address {} already in candidates",
1609 discovered_address
1610 );
1611 return Ok(());
1612 }
1613
1614 info!("Accepting QUIC-discovered address: {}", discovered_address);
1615
1616 let candidate = DiscoveryCandidate {
1618 address: discovered_address,
1619 priority,
1620 source: DiscoverySourceType::ServerReflexive,
1621 state: CandidateState::New,
1622 };
1623
1624 session.discovered_candidates.push(candidate);
1626 session.statistics.server_reflexive_candidates_found += 1;
1627
1628 Ok(())
1629 }
1630
1631 fn calculate_quic_discovered_priority(&self, address: &SocketAddr) -> u32 {
1633 let mut priority = 255; match address.ip() {
1638 IpAddr::V4(ipv4) => {
1639 if ipv4.is_private() {
1640 priority -= 10; } else if ipv4.is_loopback() {
1642 priority -= 20; }
1644 }
1646 IpAddr::V6(ipv6) => {
1647 priority += 10; if ipv6.is_loopback() {
1651 priority -= 30; } else if ipv6.is_multicast() {
1653 priority -= 40; } else if ipv6.is_unspecified() {
1655 priority -= 50; } else {
1657 let segments = ipv6.segments();
1659 if segments[0] & 0xFFC0 == 0xFE80 {
1660 priority -= 30; } else if segments[0] & 0xFE00 == 0xFC00 {
1663 priority -= 10; }
1666 }
1668 }
1669 }
1670
1671 priority
1672 }
1673
1674 #[allow(clippy::panic)]
1676 pub fn poll_discovery_progress(&mut self, peer_id: PeerId) -> Vec<DiscoveryEvent> {
1677 let mut events = Vec::new();
1678
1679 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
1680 for candidate in &session.discovered_candidates {
1682 if matches!(candidate.state, CandidateState::New) {
1683 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1684 candidate: candidate.to_candidate_address(),
1685 bootstrap_node: "0.0.0.0:0".parse().unwrap_or_else(|_| {
1686 panic!("hardcoded placeholder address should be valid")
1687 }), });
1689 }
1690 }
1691
1692 for candidate in &mut session.discovered_candidates {
1694 if matches!(candidate.state, CandidateState::New) {
1695 candidate.state = CandidateState::Validating;
1696 }
1697 }
1698 }
1699
1700 events
1701 }
1702
1703 pub fn get_discovery_status(&self, peer_id: PeerId) -> Option<DiscoveryStatus> {
1705 self.active_sessions.get(&peer_id).map(|session| {
1706 let discovered_candidates = session
1707 .discovered_candidates
1708 .iter()
1709 .map(|c| c.to_candidate_address())
1710 .collect();
1711
1712 DiscoveryStatus {
1713 phase: session.current_phase.clone(),
1714 discovered_candidates,
1715 statistics: session.statistics.clone(),
1716 elapsed_time: session.started_at.elapsed(),
1717 }
1718 })
1719 }
1720}
1721
1722#[derive(Debug, Clone)]
1724pub struct DiscoveryStatus {
1725 pub phase: DiscoveryPhase,
1726 pub discovered_candidates: Vec<CandidateAddress>,
1727 pub statistics: DiscoveryStatistics,
1728 pub elapsed_time: Duration,
1729}
1730
1731#[derive(Debug, Clone)]
1733pub struct DiscoveryResults {
1734 pub candidates: Vec<ValidatedCandidate>,
1735 pub completion_time: Instant,
1736 pub statistics: DiscoveryStatistics,
1737}
1738
1739pub trait NetworkInterfaceDiscovery {
1743 fn start_scan(&mut self) -> Result<(), String>;
1744 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1745}
1746
1747#[derive(Debug, Clone, PartialEq)]
1749pub struct NetworkInterface {
1750 pub name: String,
1751 pub addresses: Vec<SocketAddr>,
1752 pub is_up: bool,
1753 pub is_wireless: bool,
1754 pub mtu: Option<u16>,
1755}
1756
1757#[derive(Debug)]
1759#[allow(dead_code)]
1760struct BootstrapConnection {
1761 connection: crate::Connection,
1763 address: SocketAddr,
1765 established_at: Instant,
1767 request_id: u64,
1769}
1770
1771#[derive(Debug, Clone)]
1773#[allow(dead_code)]
1774struct AddressObservationRequest {
1775 request_id: u64,
1777 timestamp: u64,
1779 capabilities: u32,
1781}
1782
1783#[cfg(any())]
1785#[derive(Debug)]
1786#[allow(dead_code)]
1787pub(crate) struct ServerReflexiveDiscovery {
1788 config: DiscoveryConfig,
1789 active_queries: HashMap<BootstrapNodeId, QueryState>,
1791 responses: VecDeque<ServerReflexiveResponse>,
1793 query_timeouts: HashMap<BootstrapNodeId, Instant>,
1795 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
1797 runtime_handle: Option<tokio::runtime::Handle>,
1799}
1800
1801#[cfg(any())]
1802#[allow(dead_code)]
1803impl ServerReflexiveDiscovery {
1804 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1805 Self {
1806 config: config.clone(),
1807 active_queries: HashMap::new(),
1808 responses: VecDeque::new(),
1809 query_timeouts: HashMap::new(),
1810 active_connections: HashMap::new(),
1811 runtime_handle: tokio::runtime::Handle::try_current().ok(),
1812 }
1813 }
1814
1815 #[allow(dead_code)]
1816 pub(crate) fn start_queries(
1817 &mut self,
1818 bootstrap_nodes: &[BootstrapNodeId],
1819 now: Instant,
1820 ) -> HashMap<BootstrapNodeId, QueryState> {
1821 debug!(
1822 "Starting server reflexive queries to {} bootstrap nodes",
1823 bootstrap_nodes.len()
1824 );
1825
1826 self.active_queries.clear();
1827 self.query_timeouts.clear();
1828
1829 self.active_connections.clear();
1830
1831 for &node_id in bootstrap_nodes {
1832 let query_state = QueryState::Pending {
1833 sent_at: now,
1834 attempts: 1,
1835 };
1836
1837 self.active_queries.insert(node_id, query_state);
1838 self.query_timeouts
1839 .insert(node_id, now + self.config.bootstrap_query_timeout);
1840
1841 debug!(
1842 "Starting server reflexive query to bootstrap node {:?}",
1843 node_id
1844 );
1845
1846 if let Some(runtime) = &self.runtime_handle {
1848 self.start_quinn_query(node_id, runtime.clone(), now);
1849 } else {
1850 warn!(
1851 "No async runtime available, falling back to simulation for node {:?}",
1852 node_id
1853 );
1854 self.simulate_bootstrap_response(node_id, now);
1855 }
1856 }
1857
1858 self.active_queries.clone()
1859 }
1860
1861 #[allow(dead_code)]
1865 fn start_quinn_query(
1866 &mut self,
1867 node_id: BootstrapNodeId,
1868 _runtime: tokio::runtime::Handle,
1869 now: Instant,
1870 ) {
1871 let request_id = rand::random::<u64>();
1877
1878 debug!(
1879 "Starting Quinn connection to bootstrap node {:?} with request ID {}",
1880 node_id, request_id
1881 );
1882
1883 self.simulate_bootstrap_response(node_id, now);
1893 }
1894
1895 #[allow(dead_code)]
1902 async fn perform_bootstrap_query(
1903 _bootstrap_address: SocketAddr,
1904 _request_id: u64,
1905 _timeout: Duration,
1906 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1907 Err("Bootstrap query not implemented for low-level API".into())
1911
1912 }
1972
1973 #[allow(dead_code)]
1975 fn create_discovery_request(request_id: u64) -> Vec<u8> {
1976 let mut request = Vec::new();
1977
1978 request.extend_from_slice(&request_id.to_be_bytes());
1983 request.extend_from_slice(
1984 &std::time::SystemTime::now()
1985 .duration_since(std::time::UNIX_EPOCH)
1986 .unwrap_or_default()
1987 .as_millis()
1988 .to_be_bytes()[8..16],
1989 ); request.extend_from_slice(&1u32.to_be_bytes()); debug!(
1993 "Created discovery request: {} bytes, request_id: {}",
1994 request.len(),
1995 request_id
1996 );
1997 request
1998 }
1999
2000 #[allow(dead_code)]
2002 async fn wait_for_add_address_frame(
2003 _connection: &Connection,
2004 _expected_request_id: u64,
2005 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2006 Err("wait_for_add_address_frame not implemented for low-level API".into())
2009
2010 }
2046
2047 #[allow(dead_code)]
2049 fn create_response_channel(
2050 &self,
2051 ) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
2052 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2055 tx
2057 }
2058
2059 #[allow(dead_code)]
2060 pub(crate) fn poll_queries(
2061 &mut self,
2062 _active_queries: &HashMap<BootstrapNodeId, QueryState>,
2063 now: Instant,
2064 ) -> Vec<ServerReflexiveResponse> {
2065 let mut responses = Vec::new();
2066
2067 while let Some(response) = self.responses.pop_front() {
2069 responses.push(response);
2070 }
2071
2072 let mut timed_out_nodes = Vec::new();
2074 for (&node_id, &timeout) in &self.query_timeouts {
2075 if now >= timeout {
2076 timed_out_nodes.push(node_id);
2077 }
2078 }
2079
2080 for node_id in timed_out_nodes {
2082 self.query_timeouts.remove(&node_id);
2083
2084 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2085 match query_state {
2086 QueryState::Pending { attempts, .. }
2087 if *attempts < self.config.max_query_retries =>
2088 {
2089 *attempts += 1;
2091 let new_timeout = now + self.config.bootstrap_query_timeout;
2092 self.query_timeouts.insert(node_id, new_timeout);
2093
2094 debug!(
2095 "Retrying server reflexive query to bootstrap node {:?} (attempt {})",
2096 node_id, attempts
2097 );
2098
2099 self.simulate_bootstrap_response(node_id, now);
2101 }
2102 _ => {
2103 self.active_queries.insert(node_id, QueryState::Failed);
2105 warn!(
2106 "Server reflexive query to bootstrap node {:?} failed after retries",
2107 node_id
2108 );
2109 }
2110 }
2111 }
2112 }
2113
2114 responses
2115 }
2116
2117 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
2120 let simulated_external_addr = match node_id.0 % 3 {
2122 0 => "203.0.113.1:45678"
2123 .parse()
2124 .expect("Failed to parse hardcoded test address"),
2125 1 => "198.51.100.2:45679"
2126 .parse()
2127 .expect("Failed to parse hardcoded test address"),
2128 _ => "192.0.2.3:45680"
2129 .parse()
2130 .expect("Failed to parse hardcoded test address"),
2131 };
2132
2133 let response = ServerReflexiveResponse {
2134 bootstrap_node: node_id,
2135 observed_address: simulated_external_addr,
2136 response_time: Duration::from_millis(50 + node_id.0 * 10),
2137 timestamp: now,
2138 };
2139
2140 self.responses.push_back(response);
2141
2142 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2144 *query_state = QueryState::Completed;
2145 }
2146
2147 debug!(
2148 "Received simulated server reflexive response from bootstrap node {:?}: {}",
2149 node_id, simulated_external_addr
2150 );
2151 }
2152}
2153
2154#[cfg(any())]
2156#[derive(Debug)]
2157pub(crate) struct SymmetricNatPredictor {
2158 #[allow(dead_code)]
2159 config: DiscoveryConfig,
2160}
2161
2162#[cfg(any())]
2163impl SymmetricNatPredictor {
2164 #[allow(dead_code)]
2165 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2166 Self {
2167 config: config.clone(),
2168 }
2169 }
2170
2171 #[allow(dead_code)]
2176 pub(crate) fn generate_predictions(&mut self, max_count: usize) -> Vec<DiscoveryCandidate> {
2177 if !self.enable_symmetric_prediction {
2178 return Vec::new();
2179 }
2180
2181 let recent_events = self.get_recent_allocation_events();
2183
2184 if recent_events.is_empty() {
2185 return self.generate_heuristic_predictions(&[], max_count);
2187 }
2188
2189 if let Some(pattern) = self.analyze_allocation_pattern(&recent_events) {
2191 self.generate_pattern_based_predictions(&pattern, max_count)
2192 } else {
2193 self.generate_heuristic_predictions(
2195 &recent_events.iter().collect::<Vec<_>>(),
2196 max_count,
2197 )
2198 }
2199 }
2200
2201 #[allow(dead_code)]
2203 fn generate_pattern_based_predictions(
2204 &self,
2205 pattern: &PortAllocationPattern,
2206 max_count: usize,
2207 ) -> Vec<DiscoveryCandidate> {
2208 let mut predictions = Vec::new();
2209
2210 match pattern.pattern_type {
2211 AllocationPatternType::Sequential => {
2212 for i in 1..=max_count as u16 {
2214 let predicted_port = pattern.base_port.wrapping_add(i);
2215 if self.is_valid_port(predicted_port) {
2216 predictions.push(
2217 self.create_predicted_candidate(predicted_port, pattern.confidence),
2218 );
2219 }
2220 }
2221 }
2222 AllocationPatternType::FixedStride => {
2223 for i in 1..=max_count as u16 {
2225 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
2226 if self.is_valid_port(predicted_port) {
2227 predictions.push(
2228 self.create_predicted_candidate(predicted_port, pattern.confidence),
2229 );
2230 }
2231 }
2232 }
2233 AllocationPatternType::PoolBased => {
2234 if let Some((min_port, max_port)) = pattern.pool_boundaries {
2236 let pool_size = max_port - min_port + 1;
2237 let step = (pool_size / max_count as u16).max(1);
2238
2239 for i in 0..max_count as u16 {
2240 let predicted_port = min_port + (i * step);
2241 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
2242 predictions.push(self.create_predicted_candidate(
2243 predicted_port,
2244 pattern.confidence * 0.8,
2245 ));
2246 }
2247 }
2248 }
2249 }
2250 AllocationPatternType::TimeBased => {
2251 for i in 1..=max_count as u16 {
2254 let predicted_port = pattern.base_port.wrapping_add(i);
2255 if self.is_valid_port(predicted_port) {
2256 predictions.push(
2257 self.create_predicted_candidate(
2258 predicted_port,
2259 pattern.confidence * 0.6,
2260 ),
2261 );
2262 }
2263 }
2264 }
2265 AllocationPatternType::Random | AllocationPatternType::Unknown => {
2266 predictions
2268 .extend(self.generate_statistical_predictions(pattern.base_port, max_count));
2269 }
2270 }
2271
2272 predictions
2273 }
2274
2275 #[allow(dead_code)]
2277 fn generate_heuristic_predictions(
2278 &self,
2279 recent_events: &[&PortAllocationEvent],
2280 max_count: usize,
2281 ) -> Vec<DiscoveryCandidate> {
2282 let mut predictions = Vec::new();
2283
2284 if let Some(latest_event) = recent_events.first() {
2285 let base_port = latest_event.port;
2286
2287 for i in 1..=(max_count / 3) as u16 {
2291 let predicted_port = base_port.wrapping_add(i);
2292 if self.is_valid_port(predicted_port) {
2293 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
2294 }
2295 }
2296
2297 if base_port % 2 == 0 {
2299 let predicted_port = base_port + 1;
2300 if self.is_valid_port(predicted_port) {
2301 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
2302 }
2303 }
2304
2305 for stride in [2, 4, 8, 16] {
2307 if predictions.len() >= max_count {
2308 break;
2309 }
2310 let predicted_port = base_port.wrapping_add(stride);
2311 if self.is_valid_port(predicted_port) {
2312 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
2313 }
2314 }
2315
2316 if recent_events.len() >= 2 {
2318 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
2319 if stride > 0 && stride <= 100 {
2320 for i in 1..=3 {
2322 if predictions.len() >= max_count {
2323 break;
2324 }
2325 let predicted_port = base_port.wrapping_add(stride * i);
2326 if self.is_valid_port(predicted_port) {
2327 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
2328 }
2329 }
2330 }
2331 }
2332 }
2333
2334 predictions.truncate(max_count);
2335 predictions
2336 }
2337
2338 #[allow(dead_code)]
2340 fn generate_statistical_predictions(
2341 &self,
2342 base_port: u16,
2343 max_count: usize,
2344 ) -> Vec<DiscoveryCandidate> {
2345 let mut predictions = Vec::new();
2346
2347 let common_ranges = [
2349 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
2354
2355 let current_range = common_ranges
2357 .iter()
2358 .find(|(min, max)| base_port >= *min && base_port <= *max)
2359 .copied()
2360 .unwrap_or((1024, 65535));
2361
2362 let range_size = current_range.1 - current_range.0;
2364 let step = (range_size / max_count as u16).max(1);
2365
2366 for i in 0..max_count {
2367 let offset = (i as u16 * step) % range_size;
2368 let predicted_port = current_range.0 + offset;
2369
2370 if self.is_valid_port(predicted_port) && predicted_port != base_port {
2371 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
2372 }
2373 }
2374
2375 predictions
2376 }
2377
2378 #[allow(dead_code)]
2380 fn is_valid_port(&self, port: u16) -> bool {
2381 const COMMON_PORTS_TO_AVOID: &[u16] = &[
2385 21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3389, 5432, 3306, 6379, 27017, ];
2401
2402 port != 0 && port >= 1024 && !COMMON_PORTS_TO_AVOID.contains(&port)
2403 }
2404
2405 #[allow(dead_code)]
2407 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2408 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
2412
2413 DiscoveryCandidate {
2414 address: SocketAddr::new(
2415 "0.0.0.0"
2416 .parse()
2417 .expect("Failed to parse hardcoded placeholder IP"),
2418 port,
2419 ),
2420 priority,
2421 source: DiscoverySourceType::Predicted,
2422 state: CandidateState::New,
2423 }
2424 }
2425
2426 #[allow(dead_code)]
2428 pub(crate) fn analyze_allocation_patterns(
2429 &self,
2430 history: &VecDeque<PortAllocationEvent>,
2431 ) -> Option<PortAllocationPattern> {
2432 if history.len() < 3 {
2433 return None;
2434 }
2435
2436 let recent_ports: Vec<u16> = history
2437 .iter()
2438 .rev()
2439 .take(10)
2440 .map(|event| event.port)
2441 .collect();
2442
2443 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2445 return Some(pattern);
2446 }
2447
2448 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2450 return Some(pattern);
2451 }
2452
2453 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2455 return Some(pattern);
2456 }
2457
2458 if let Some(pattern) = self.detect_time_based_pattern(history) {
2460 return Some(pattern);
2461 }
2462
2463 None
2464 }
2465
2466 #[allow(dead_code)]
2468 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2469 if ports.len() < 3 {
2470 return None;
2471 }
2472
2473 let mut sequential_count = 0;
2474 let mut total_comparisons = 0;
2475
2476 for i in 1..ports.len() {
2477 total_comparisons += 1;
2478 let diff = ports[i - 1].wrapping_sub(ports[i]);
2479 if diff == 1 {
2480 sequential_count += 1;
2481 }
2482 }
2483
2484 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2485
2486 if sequential_ratio >= 0.6 {
2487 let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2491 pattern_type: AllocationPatternType::Sequential,
2492 base_port: ports[0],
2493 stride: 1,
2494 pool_boundaries: None,
2495 confidence,
2496 })
2497 } else {
2498 None
2499 }
2500 }
2501
2502 #[allow(dead_code)]
2504 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2505 if ports.len() < 4 {
2506 return None;
2507 }
2508
2509 let mut diffs = Vec::new();
2511 for i in 1..ports.len() {
2512 let diff = ports[i - 1].wrapping_sub(ports[i]);
2513 if diff > 0 && diff <= 1000 {
2514 diffs.push(diff);
2516 }
2517 }
2518
2519 if diffs.len() < 2 {
2520 return None;
2521 }
2522
2523 let mut diff_counts = std::collections::HashMap::new();
2525 for &diff in &diffs {
2526 *diff_counts.entry(diff).or_insert(0) += 1;
2527 }
2528
2529 let (most_common_diff, count) = diff_counts
2530 .iter()
2531 .max_by_key(|&(_, &count)| count)
2532 .map(|(&diff, &count)| (diff, count))?;
2533
2534 let consistency_ratio = count as f64 / diffs.len() as f64;
2535
2536 if consistency_ratio >= 0.5 && most_common_diff > 1 {
2537 let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2541 pattern_type: AllocationPatternType::FixedStride,
2542 base_port: ports[0],
2543 stride: most_common_diff,
2544 pool_boundaries: None,
2545 confidence,
2546 })
2547 } else {
2548 None
2549 }
2550 }
2551
2552 #[allow(dead_code)]
2554 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2555 if ports.len() < 5 {
2556 return None;
2557 }
2558
2559 let min_port = *ports.iter().min()?;
2560 let max_port = *ports.iter().max()?;
2561 let range = max_port - min_port;
2562
2563 if range > 0 && range <= 10000 {
2565 let expected_step = range / (ports.len() as u16 - 1);
2568 let mut uniform_score = 0.0;
2569
2570 let mut sorted_ports = ports.to_vec();
2571 sorted_ports.sort_unstable();
2572
2573 for i in 1..sorted_ports.len() {
2574 let actual_step = sorted_ports[i] - sorted_ports[i - 1];
2575 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2576 let normalized_diff = step_diff / expected_step as f64;
2577 uniform_score += 1.0 - normalized_diff.min(1.0);
2578 }
2579
2580 uniform_score /= (sorted_ports.len() - 1) as f64;
2581
2582 if uniform_score >= 0.4 {
2583 let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2587 pattern_type: AllocationPatternType::PoolBased,
2588 base_port: min_port,
2589 stride: expected_step,
2590 pool_boundaries: Some((min_port, max_port)),
2591 confidence,
2592 })
2593 } else {
2594 None
2595 }
2596 } else {
2597 None
2598 }
2599 }
2600
2601 #[allow(dead_code)]
2603 fn detect_time_based_pattern(
2604 &self,
2605 history: &VecDeque<PortAllocationEvent>,
2606 ) -> Option<PortAllocationPattern> {
2607 if history.len() < 4 {
2608 return None;
2609 }
2610
2611 let mut time_intervals = Vec::new();
2613 let events: Vec<_> = history.iter().collect();
2614
2615 for i in 1..events.len() {
2616 let interval = events[i - 1].timestamp.duration_since(events[i].timestamp);
2617 time_intervals.push(interval);
2618 }
2619
2620 if time_intervals.is_empty() {
2621 return None;
2622 }
2623
2624 let avg_interval =
2626 time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2627
2628 let mut consistency_score = 0.0;
2629 for interval in &time_intervals {
2630 let diff = (*interval).abs_diff(avg_interval);
2631
2632 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2633 consistency_score += 1.0 - normalized_diff.min(1.0);
2634 }
2635
2636 consistency_score /= time_intervals.len() as f64;
2637
2638 if consistency_score >= 0.6
2639 && avg_interval.as_millis() > 100
2640 && avg_interval.as_millis() < 10000
2641 {
2642 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2645 pattern_type: AllocationPatternType::TimeBased,
2646 base_port: events[0].port,
2647 stride: 1, pool_boundaries: None,
2649 confidence,
2650 })
2651 } else {
2652 None
2653 }
2654 }
2655
2656 #[allow(dead_code)]
2658 pub(crate) fn generate_confidence_scored_predictions(
2659 &mut self,
2660 base_address: SocketAddr,
2661 _max_count: usize,
2662 ) -> Vec<(DiscoveryCandidate, f64)> {
2663 let scored_predictions = Vec::new();
2664
2665 let _ = base_address;
2667
2668 scored_predictions
2669 }
2670
2671 #[allow(dead_code)]
2673 fn calculate_prediction_confidence(&self) -> f64 {
2674 0.0
2675 }
2676
2677 #[allow(dead_code)]
2679 pub(crate) fn update_pattern_analysis(&self, _new_event: PortAllocationEvent) {}
2680}
2681
2682#[cfg(any())]
2684#[allow(dead_code)]
2685#[derive(Debug)]
2686pub(crate) struct BootstrapNodeManager {
2687 config: DiscoveryConfig,
2688 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
2689 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
2690 performance_tracker: BootstrapPerformanceTracker,
2691 last_health_check: Option<Instant>,
2692 health_check_interval: Duration,
2693 failover_threshold: f64,
2694 discovery_sources: Vec<BootstrapDiscoverySource>,
2695}
2696
2697#[cfg(any())]
2698#[allow(dead_code)]
2699#[derive(Debug, Clone)]
2700pub(crate) struct BootstrapNodeInfo {
2701 pub address: SocketAddr,
2702 pub last_seen: Instant,
2703 pub can_coordinate: bool,
2704 pub health_status: BootstrapHealthStatus,
2705 pub capabilities: BootstrapCapabilities,
2706 pub priority: u32,
2707 pub discovery_source: BootstrapDiscoverySource,
2708}
2709
2710#[cfg(any())]
2711#[allow(dead_code)]
2712#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2713pub(crate) enum BootstrapHealthStatus {
2714 Healthy,
2715 Degraded,
2716 Unhealthy,
2717 Unknown,
2718}
2719
2720#[cfg(any())]
2721#[allow(dead_code)]
2722#[derive(Debug, Clone, Default)]
2723pub(crate) struct BootstrapCapabilities {
2724 pub supports_nat_traversal: bool,
2725 pub supports_ipv6: bool,
2726 pub supports_quic_extensions: bool,
2727 pub max_concurrent_coordinations: u32,
2728 pub supported_quic_versions: Vec<u32>,
2729}
2730
2731#[cfg(any())]
2732#[allow(dead_code)]
2733#[derive(Debug, Clone, Default)]
2734pub(crate) struct BootstrapHealthStats {
2735 pub connection_attempts: u32,
2736 pub successful_connections: u32,
2737 pub failed_connections: u32,
2738 pub average_rtt: Option<Duration>,
2739 pub recent_rtts: VecDeque<Duration>,
2740 pub last_health_check: Option<Instant>,
2741 pub consecutive_failures: u32,
2742 pub coordination_requests: u32,
2743 pub successful_coordinations: u32,
2744}
2745
2746#[cfg(any())]
2747#[allow(dead_code)]
2748#[derive(Debug, Default)]
2749pub(crate) struct BootstrapPerformanceTracker {
2750 pub overall_success_rate: f64,
2751 pub average_response_time: Duration,
2752 pub best_performers: Vec<BootstrapNodeId>,
2753 pub performance_history: VecDeque<PerformanceSnapshot>,
2754}
2755
2756#[cfg(any())]
2757#[allow(dead_code)]
2758#[derive(Debug, Clone)]
2759pub(crate) struct PerformanceSnapshot {
2760 pub timestamp: Instant,
2761 pub active_nodes: u32,
2762 pub success_rate: f64,
2763 pub average_rtt: Duration,
2764}
2765
2766#[cfg(any())]
2767#[allow(dead_code)]
2768#[derive(Debug, Clone, PartialEq, Eq)]
2769pub(crate) enum BootstrapDiscoverySource {
2770 Static,
2771 DNS,
2772 UserProvided,
2773}
2774
2775#[cfg(any())]
2776#[allow(dead_code)]
2777impl BootstrapNodeManager {
2778 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2779 Self {
2780 config: config.clone(),
2781 bootstrap_nodes: HashMap::new(),
2782 health_stats: HashMap::new(),
2783 performance_tracker: BootstrapPerformanceTracker::default(),
2784 last_health_check: None,
2785 health_check_interval: Duration::from_secs(30),
2786 failover_threshold: 0.3, discovery_sources: vec![
2788 BootstrapDiscoverySource::Static,
2789 BootstrapDiscoverySource::DNS,
2790 BootstrapDiscoverySource::UserProvided,
2791 ],
2792 }
2793 }
2794
2795 #[allow(dead_code)]
2797 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
2798 let now = Instant::now();
2799
2800 for (i, node) in nodes.into_iter().enumerate() {
2802 let node_id = BootstrapNodeId(i as u64);
2803
2804 let node_info = BootstrapNodeInfo {
2805 address: node.address,
2806 last_seen: node.last_seen,
2807 can_coordinate: node.can_coordinate,
2808 health_status: BootstrapHealthStatus::Unknown,
2809 capabilities: BootstrapCapabilities {
2810 supports_nat_traversal: node.can_coordinate,
2811 supports_ipv6: node.address.is_ipv6(),
2812 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
2816 priority: self.calculate_initial_priority(&node),
2817 discovery_source: BootstrapDiscoverySource::UserProvided,
2818 };
2819
2820 self.bootstrap_nodes.insert(node_id, node_info);
2821
2822 self.health_stats.entry(node_id).or_default();
2824 }
2825
2826 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
2827 self.schedule_health_check(now);
2828 }
2829
2830 #[allow(dead_code)]
2832 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
2833 let mut active_nodes: Vec<_> = self
2834 .bootstrap_nodes
2835 .iter()
2836 .filter(|(_, node)| {
2837 matches!(
2838 node.health_status,
2839 BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown
2840 )
2841 })
2842 .map(|(&id, node)| (id, node))
2843 .collect();
2844
2845 active_nodes.sort_by(|a, b| {
2847 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
2849 if health_cmp != std::cmp::Ordering::Equal {
2850 return health_cmp;
2851 }
2852
2853 b.1.priority.cmp(&a.1.priority)
2855 });
2856
2857 active_nodes.into_iter().map(|(id, _)| id).collect()
2858 }
2859
2860 #[allow(dead_code)]
2862 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
2863 self.bootstrap_nodes.get(&id).map(|node| node.address)
2864 }
2865
2866 #[allow(dead_code)]
2868 pub(crate) fn perform_health_check(&mut self, now: Instant) {
2869 if let Some(last_check) = self.last_health_check {
2870 if now.duration_since(last_check) < self.health_check_interval {
2871 return; }
2873 }
2874
2875 debug!(
2876 "Performing health check on {} bootstrap nodes",
2877 self.bootstrap_nodes.len()
2878 );
2879
2880 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
2882
2883 for node_id in node_ids {
2884 self.check_node_health(node_id, now);
2885 }
2886
2887 self.update_performance_metrics(now);
2888 self.last_health_check = Some(now);
2889 }
2890
2891 #[allow(dead_code)]
2893 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
2894 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
2896 let node_info_for_priority = match node_info_opt {
2897 Some(node_info) => node_info,
2898 None => return, };
2900 let current_health_status = node_info_for_priority.health_status;
2901
2902 let (_success_rate, new_health_status, _average_rtt) = {
2904 let stats = self.health_stats.get_mut(&node_id).unwrap();
2905
2906 let success_rate = if stats.connection_attempts > 0 {
2908 stats.successful_connections as f64 / stats.connection_attempts as f64
2909 } else {
2910 1.0 };
2912
2913 if !stats.recent_rtts.is_empty() {
2915 let total_rtt: Duration = stats.recent_rtts.iter().sum();
2916 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
2917 }
2918
2919 let new_health_status = if stats.consecutive_failures >= 3 {
2921 BootstrapHealthStatus::Unhealthy
2922 } else if success_rate < self.failover_threshold {
2923 BootstrapHealthStatus::Degraded
2924 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
2925 BootstrapHealthStatus::Healthy
2926 } else {
2927 current_health_status };
2929
2930 stats.last_health_check = Some(now);
2931
2932 (success_rate, new_health_status, stats.average_rtt)
2933 };
2934
2935 let stats_snapshot = match self.health_stats.get(&node_id) {
2937 Some(stats) => stats,
2938 None => {
2939 warn!("No health stats found for bootstrap node {:?}", node_id);
2940 return;
2941 }
2942 };
2943 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
2944
2945 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2947 if new_health_status != node_info.health_status {
2948 info!(
2949 "Bootstrap node {:?} health status changed: {:?} -> {:?}",
2950 node_id, node_info.health_status, new_health_status
2951 );
2952 node_info.health_status = new_health_status;
2953 }
2954
2955 node_info.priority = new_priority;
2956 }
2957 }
2958
2959 #[allow(dead_code)]
2961 pub(crate) fn record_connection_attempt(
2962 &mut self,
2963 node_id: BootstrapNodeId,
2964 success: bool,
2965 rtt: Option<Duration>,
2966 ) {
2967 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2968 stats.connection_attempts += 1;
2969
2970 if success {
2971 stats.successful_connections += 1;
2972 stats.consecutive_failures = 0;
2973
2974 if let Some(rtt) = rtt {
2975 stats.recent_rtts.push_back(rtt);
2976 if stats.recent_rtts.len() > 10 {
2977 stats.recent_rtts.pop_front();
2978 }
2979 }
2980 } else {
2981 stats.failed_connections += 1;
2982 stats.consecutive_failures += 1;
2983 }
2984 }
2985
2986 if success {
2988 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2989 node_info.last_seen = Instant::now();
2990 }
2991 }
2992 }
2993
2994 #[allow(dead_code)]
2996 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
2997 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2998 stats.coordination_requests += 1;
2999 if success {
3000 stats.successful_coordinations += 1;
3001 }
3002 }
3003 }
3004
3005 #[allow(dead_code)]
3007 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
3008 let mut nodes_with_scores: Vec<_> = self
3009 .bootstrap_nodes
3010 .iter()
3011 .filter_map(|(&id, node)| {
3012 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
3013 let score = self.calculate_performance_score(id, node);
3014 Some((id, score))
3015 } else {
3016 None
3017 }
3018 })
3019 .collect();
3020
3021 nodes_with_scores
3022 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3023
3024 nodes_with_scores
3025 .into_iter()
3026 .take(count)
3027 .map(|(id, _)| id)
3028 .collect()
3029 }
3030
3031 #[allow(dead_code)]
3033 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
3034 let mut discovered_nodes = Vec::new();
3035
3036 if let Ok(dns_nodes) = self.discover_via_dns() {
3038 discovered_nodes.extend(dns_nodes);
3039 }
3040
3041 if let Ok(multicast_nodes) = self.discover_via_multicast() {
3043 discovered_nodes.extend(multicast_nodes);
3044 }
3045
3046 for node in &discovered_nodes {
3048 let node_id = BootstrapNodeId(rand::random());
3049 self.bootstrap_nodes.insert(node_id, node.clone());
3050 self.health_stats
3051 .insert(node_id, BootstrapHealthStats::default());
3052 }
3053
3054 if !discovered_nodes.is_empty() {
3055 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
3056 }
3057
3058 Ok(discovered_nodes)
3059 }
3060
3061 #[allow(dead_code)]
3063 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3064 debug!("DNS-based bootstrap discovery not yet implemented");
3067 Ok(Vec::new())
3068 }
3069
3070 #[allow(dead_code)]
3072 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3073 debug!("Multicast-based bootstrap discovery not yet implemented");
3076 Ok(Vec::new())
3077 }
3078
3079 #[allow(dead_code)]
3081 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
3082 let mut priority = 100; if node.can_coordinate {
3085 priority += 50;
3086 }
3087
3088 if let Some(rtt) = node.rtt {
3089 if rtt < Duration::from_millis(50) {
3090 priority += 30;
3091 } else if rtt < Duration::from_millis(100) {
3092 priority += 20;
3093 } else if rtt < Duration::from_millis(200) {
3094 priority += 10;
3095 }
3096 }
3097
3098 if node.address.is_ipv6() {
3100 priority += 10;
3101 }
3102
3103 priority
3104 }
3105
3106 #[allow(dead_code)]
3108 fn calculate_dynamic_priority(
3109 &self,
3110 node_info: &BootstrapNodeInfo,
3111 stats: &BootstrapHealthStats,
3112 ) -> u32 {
3113 let mut priority = node_info.priority;
3114
3115 let success_rate = if stats.connection_attempts > 0 {
3117 stats.successful_connections as f64 / stats.connection_attempts as f64
3118 } else {
3119 1.0
3120 };
3121
3122 priority = (priority as f64 * success_rate) as u32;
3123
3124 if let Some(avg_rtt) = stats.average_rtt {
3126 if avg_rtt < Duration::from_millis(50) {
3127 priority += 20;
3128 } else if avg_rtt > Duration::from_millis(500) {
3129 priority = priority.saturating_sub(20);
3130 }
3131 }
3132
3133 priority = priority.saturating_sub(stats.consecutive_failures * 10);
3135
3136 priority.max(1) }
3138
3139 #[allow(dead_code)]
3141 fn calculate_performance_score(
3142 &self,
3143 node_id: BootstrapNodeId,
3144 _node_info: &BootstrapNodeInfo,
3145 ) -> f64 {
3146 let stats = self.health_stats.get(&node_id).unwrap();
3147
3148 let mut score = 0.0;
3149
3150 let success_rate = if stats.connection_attempts > 0 {
3152 stats.successful_connections as f64 / stats.connection_attempts as f64
3153 } else {
3154 1.0
3155 };
3156 score += success_rate * 0.4;
3157
3158 if let Some(avg_rtt) = stats.average_rtt {
3160 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
3161 score += rtt_score * 0.3;
3162 } else {
3163 score += 0.3; }
3165
3166 let coord_success_rate = if stats.coordination_requests > 0 {
3168 stats.successful_coordinations as f64 / stats.coordination_requests as f64
3169 } else {
3170 1.0
3171 };
3172 score += coord_success_rate * 0.2;
3173
3174 let stability_score = if stats.consecutive_failures == 0 {
3176 1.0
3177 } else {
3178 1.0 / (stats.consecutive_failures as f64 + 1.0)
3179 };
3180 score += stability_score * 0.1;
3181
3182 score
3183 }
3184
3185 fn compare_health_status(
3187 &self,
3188 a: BootstrapHealthStatus,
3189 b: BootstrapHealthStatus,
3190 ) -> std::cmp::Ordering {
3191 use std::cmp::Ordering;
3192
3193 match (a, b) {
3194 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
3195 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
3197 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
3198 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
3200 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
3201 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
3203 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
3204 }
3205 }
3206
3207 fn update_performance_metrics(&mut self, now: Instant) {
3209 let mut total_attempts = 0;
3210 let mut total_successes = 0;
3211 let mut total_rtt = Duration::ZERO;
3212 let mut rtt_count = 0;
3213
3214 for stats in self.health_stats.values() {
3215 total_attempts += stats.connection_attempts;
3216 total_successes += stats.successful_connections;
3217
3218 if let Some(avg_rtt) = stats.average_rtt {
3219 total_rtt += avg_rtt;
3220 rtt_count += 1;
3221 }
3222 }
3223
3224 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
3225 total_successes as f64 / total_attempts as f64
3226 } else {
3227 1.0
3228 };
3229
3230 self.performance_tracker.average_response_time = if rtt_count > 0 {
3231 total_rtt / rtt_count
3232 } else {
3233 Duration::from_millis(100) };
3235
3236 self.performance_tracker.best_performers = self.get_best_performers(5);
3238
3239 let snapshot = PerformanceSnapshot {
3241 timestamp: now,
3242 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
3243 success_rate: self.performance_tracker.overall_success_rate,
3244 average_rtt: self.performance_tracker.average_response_time,
3245 };
3246
3247 self.performance_tracker
3248 .performance_history
3249 .push_back(snapshot);
3250 if self.performance_tracker.performance_history.len() > 100 {
3251 self.performance_tracker.performance_history.pop_front();
3252 }
3253 }
3254
3255 fn schedule_health_check(&mut self, _now: Instant) {
3257 }
3260
3261 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
3263 &self.performance_tracker
3264 }
3265
3266 pub(crate) fn get_node_health_stats(
3268 &self,
3269 node_id: BootstrapNodeId,
3270 ) -> Option<&BootstrapHealthStats> {
3271 self.health_stats.get(&node_id)
3272 }
3273}
3274
3275#[derive(Debug)]
3277pub(crate) struct DiscoveryCache {
3278 #[allow(dead_code)]
3279 config: DiscoveryConfig,
3280}
3281
3282impl DiscoveryCache {
3283 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3284 Self {
3285 config: config.clone(),
3286 }
3287 }
3288}
3289
3290pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
3292 #[cfg(target_os = "windows")]
3293 return Box::new(WindowsInterfaceDiscovery::new());
3294
3295 #[cfg(target_os = "linux")]
3296 return Box::new(LinuxInterfaceDiscovery::new());
3297
3298 #[cfg(all(target_os = "macos", feature = "network-discovery"))]
3299 return Box::new(MacOSInterfaceDiscovery::new());
3300
3301 #[cfg(all(target_os = "macos", not(feature = "network-discovery")))]
3302 return Box::new(GenericInterfaceDiscovery::new());
3303
3304 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
3305 return Box::new(GenericInterfaceDiscovery::new());
3306}
3307
3308#[allow(dead_code)]
3318pub(crate) struct GenericInterfaceDiscovery {
3319 scan_complete: bool,
3320}
3321
3322impl GenericInterfaceDiscovery {
3323 #[allow(dead_code)]
3324 pub(crate) fn new() -> Self {
3325 Self {
3326 scan_complete: false,
3327 }
3328 }
3329}
3330
3331impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
3332 fn start_scan(&mut self) -> Result<(), String> {
3333 self.scan_complete = true;
3335 Ok(())
3336 }
3337
3338 #[allow(clippy::panic)]
3339 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
3340 if self.scan_complete {
3341 self.scan_complete = false;
3342 Some(vec![NetworkInterface {
3343 name: "generic".to_string(),
3344 addresses: vec![
3345 "127.0.0.1:0"
3346 .parse()
3347 .unwrap_or_else(|_| panic!("Failed to parse hardcoded localhost address")),
3348 ],
3349 is_up: true,
3350 is_wireless: false,
3351 mtu: Some(1500),
3352 }])
3353 } else {
3354 None
3355 }
3356 }
3357}
3358
3359impl std::fmt::Display for DiscoveryError {
3360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3361 match self {
3362 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
3363 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
3364 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
3365 Self::InsufficientCandidates { found, required } => {
3366 write!(f, "insufficient candidates found: {found} < {required}")
3367 }
3368 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3369 Self::ConfigurationError(msg) => write!(f, "configuration error: {msg}"),
3370 Self::InternalError(msg) => write!(f, "internal error: {msg}"),
3371 }
3372 }
3373}
3374
3375impl std::error::Error for DiscoveryError {}
3376
3377pub mod test_utils {
3379 use super::*;
3380
3381 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
3383 let mut priority = 100; match address {
3385 IpAddr::V4(ipv4) => {
3386 if ipv4.is_private() {
3387 priority += 50; }
3389 }
3390 IpAddr::V6(ipv6) => {
3391 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3394 let segments = ipv6.segments();
3395 if segments[0] & 0xE000 == 0x2000 {
3396 priority += 60;
3398 } else if segments[0] & 0xFFC0 == 0xFE80 {
3399 priority += 20;
3401 } else if segments[0] & 0xFE00 == 0xFC00 {
3402 priority += 40;
3404 } else {
3405 priority += 30;
3407 }
3408 }
3409
3410 priority += 10; }
3413 }
3414 priority
3415 }
3416
3417 pub fn is_valid_address(address: &IpAddr) -> bool {
3419 match address {
3420 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3421 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3422 }
3423 }
3424}
3425
3426#[cfg(test)]
3427mod tests {
3428 use super::*;
3429
3430 fn create_test_manager() -> CandidateDiscoveryManager {
3431 let config = DiscoveryConfig {
3432 total_timeout: Duration::from_secs(30),
3433 local_scan_timeout: Duration::from_secs(5),
3434 bootstrap_query_timeout: Duration::from_secs(10),
3435 max_query_retries: 3,
3436 max_candidates: 50,
3437 enable_symmetric_prediction: true,
3438 min_bootstrap_consensus: 2,
3439 interface_cache_ttl: Duration::from_secs(300),
3440 server_reflexive_cache_ttl: Duration::from_secs(600),
3441 bound_address: None,
3442 };
3443 CandidateDiscoveryManager::new(config)
3444 }
3445
3446 #[test]
3447 fn test_accept_quic_discovered_addresses() {
3448 let mut manager = create_test_manager();
3449 let peer_id = PeerId([1; 32]);
3450
3451 manager
3453 .start_discovery(peer_id, vec![])
3454 .expect("Failed to start discovery in test");
3455
3456 let discovered_addr = "192.168.1.100:5000"
3458 .parse()
3459 .expect("Failed to parse test address");
3460 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3461
3462 assert!(result.is_ok());
3463
3464 if let Some(session) = manager.active_sessions.get(&peer_id) {
3466 let found = session.discovered_candidates.iter().any(|c| {
3467 c.address == discovered_addr
3468 && matches!(c.source, DiscoverySourceType::ServerReflexive)
3469 });
3470 assert!(found, "QUIC-discovered address should be in candidates");
3471 }
3472 }
3473
3474 #[test]
3475 fn test_accept_quic_discovered_addresses_no_session() {
3476 let mut manager = create_test_manager();
3477 let peer_id = PeerId([1; 32]);
3478 let discovered_addr = "192.168.1.100:5000"
3479 .parse()
3480 .expect("Failed to parse test address");
3481
3482 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3484
3485 assert!(result.is_err());
3486 match result {
3487 Err(DiscoveryError::InternalError(msg)) => {
3488 assert!(msg.contains("No active discovery session"));
3489 }
3490 _ => panic!("Expected InternalError for missing session"),
3491 }
3492 }
3493
3494 #[test]
3495 fn test_accept_quic_discovered_addresses_deduplication() {
3496 let mut manager = create_test_manager();
3497 let peer_id = PeerId([1; 32]);
3498
3499 manager
3501 .start_discovery(peer_id, vec![])
3502 .expect("Failed to start discovery in test");
3503
3504 let discovered_addr = "192.168.1.100:5000"
3506 .parse()
3507 .expect("Failed to parse test address");
3508 let result1 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3509 let result2 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3510
3511 assert!(result1.is_ok());
3512 assert!(result2.is_ok()); if let Some(session) = manager.active_sessions.get(&peer_id) {
3516 let count = session
3517 .discovered_candidates
3518 .iter()
3519 .filter(|c| c.address == discovered_addr)
3520 .count();
3521 assert_eq!(count, 1, "Should not have duplicate addresses");
3522 }
3523 }
3524
3525 #[test]
3526 fn test_accept_quic_discovered_addresses_priority() {
3527 let mut manager = create_test_manager();
3528 let peer_id = PeerId([1; 32]);
3529
3530 manager
3532 .start_discovery(peer_id, vec![])
3533 .expect("Failed to start discovery in test");
3534
3535 let public_addr = "8.8.8.8:5000"
3537 .parse()
3538 .expect("Failed to parse test address");
3539 let private_addr = "192.168.1.100:5000"
3540 .parse()
3541 .expect("Failed to parse test address");
3542 let ipv6_addr = "[2001:db8::1]:5000"
3543 .parse()
3544 .expect("Failed to parse test address");
3545
3546 manager
3547 .accept_quic_discovered_address(peer_id, public_addr)
3548 .expect("Failed to accept public address in test");
3549 manager
3550 .accept_quic_discovered_address(peer_id, private_addr)
3551 .expect("Failed to accept private address in test");
3552 manager
3553 .accept_quic_discovered_address(peer_id, ipv6_addr)
3554 .unwrap();
3555
3556 if let Some(session) = manager.active_sessions.get(&peer_id) {
3558 for candidate in &session.discovered_candidates {
3559 assert!(
3560 candidate.priority > 0,
3561 "All candidates should have non-zero priority"
3562 );
3563
3564 if candidate.address == ipv6_addr {
3566 let ipv4_priority = session
3567 .discovered_candidates
3568 .iter()
3569 .find(|c| c.address == public_addr)
3570 .map(|c| c.priority)
3571 .expect("Public address should be found in candidates");
3572
3573 assert!(candidate.priority >= ipv4_priority);
3575 }
3576 }
3577 }
3578 }
3579
3580 #[test]
3581 fn test_accept_quic_discovered_addresses_event_generation() {
3582 let mut manager = create_test_manager();
3583 let peer_id = PeerId([1; 32]);
3584
3585 manager
3587 .start_discovery(peer_id, vec![])
3588 .expect("Failed to start discovery in test");
3589
3590 let discovered_addr = "192.168.1.100:5000"
3592 .parse()
3593 .expect("Failed to parse test address");
3594 manager
3595 .accept_quic_discovered_address(peer_id, discovered_addr)
3596 .expect("Failed to accept address in test");
3597
3598 let events = manager.poll_discovery_progress(peer_id);
3600
3601 let has_event = events.iter().any(|e| {
3603 matches!(e,
3604 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3605 if candidate.address == discovered_addr
3606 )
3607 });
3608
3609 assert!(
3610 has_event,
3611 "Should generate discovery event for QUIC-discovered address"
3612 );
3613 }
3614
3615 #[test]
3616 fn test_discovery_completes_without_server_reflexive_phase() {
3617 let mut manager = create_test_manager();
3618 let peer_id = PeerId([1; 32]);
3619
3620 manager
3622 .start_discovery(peer_id, vec![])
3623 .expect("Failed to start discovery in test");
3624
3625 let discovered_addr = "192.168.1.100:5000"
3627 .parse()
3628 .expect("Failed to parse test address");
3629 manager
3630 .accept_quic_discovered_address(peer_id, discovered_addr)
3631 .expect("Failed to accept address in test");
3632
3633 let status = manager
3635 .get_discovery_status(peer_id)
3636 .expect("Failed to get discovery status in test");
3637
3638 match &status.phase {
3640 DiscoveryPhase::ServerReflexiveQuerying { .. } => {
3641 panic!("Should not be in ServerReflexiveQuerying phase when using QUIC discovery");
3642 }
3643 _ => {} }
3645 }
3646
3647 #[test]
3648 fn test_no_bootstrap_queries_when_using_quic_discovery() {
3649 let mut manager = create_test_manager();
3650 let peer_id = PeerId([1; 32]);
3651
3652 manager
3654 .start_discovery(peer_id, vec![])
3655 .expect("Failed to start discovery in test");
3656
3657 let addr1 = "192.168.1.100:5000"
3659 .parse()
3660 .expect("Failed to parse test address");
3661 let addr2 = "8.8.8.8:5000"
3662 .parse()
3663 .expect("Failed to parse test address");
3664 manager
3665 .accept_quic_discovered_address(peer_id, addr1)
3666 .expect("Failed to accept address in test");
3667 manager
3668 .accept_quic_discovered_address(peer_id, addr2)
3669 .expect("Failed to accept address in test");
3670
3671 let status = manager
3673 .get_discovery_status(peer_id)
3674 .expect("Failed to get discovery status in test");
3675
3676 assert!(status.discovered_candidates.len() >= 2);
3678
3679 if let Some(session) = manager.active_sessions.get(&peer_id) {
3681 assert_eq!(
3683 session.statistics.bootstrap_queries_sent, 0,
3684 "Should not query bootstrap nodes when using QUIC discovery"
3685 );
3686 }
3687 }
3688
3689 #[test]
3690 fn test_priority_differences_quic_vs_placeholder() {
3691 let mut manager = create_test_manager();
3692 let peer_id = PeerId([1; 32]);
3693
3694 manager
3696 .start_discovery(peer_id, vec![])
3697 .expect("Failed to start discovery in test");
3698
3699 let discovered_addr = "8.8.8.8:5000"
3701 .parse()
3702 .expect("Failed to parse test address");
3703 manager
3704 .accept_quic_discovered_address(peer_id, discovered_addr)
3705 .expect("Failed to accept address in test");
3706
3707 if let Some(session) = manager.active_sessions.get(&peer_id) {
3709 let candidate = session
3710 .discovered_candidates
3711 .iter()
3712 .find(|c| c.address == discovered_addr)
3713 .expect("Should find the discovered address");
3714
3715 assert!(
3717 candidate.priority > 100,
3718 "QUIC-discovered address should have good priority"
3719 );
3720 assert!(candidate.priority < 300, "Priority should be reasonable");
3721
3722 assert!(matches!(
3724 candidate.source,
3725 DiscoverySourceType::ServerReflexive
3726 ));
3727 }
3728 }
3729
3730 #[test]
3731 fn test_quic_discovered_address_priority_calculation() {
3732 let mut manager = create_test_manager();
3734 let peer_id = PeerId([1; 32]);
3735
3736 manager
3738 .start_discovery(peer_id, vec![])
3739 .expect("Failed to start discovery in test");
3740
3741 let test_cases = vec![
3743 ("1.2.3.4:5678", (250, 260), "Public IPv4"),
3745 ("192.168.1.100:9000", (240, 250), "Private IPv4"),
3746 ("[2001:db8::1]:5678", (260, 280), "Global IPv6"),
3747 ("[fe80::1]:5678", (220, 240), "Link-local IPv6"),
3748 ("[fc00::1]:5678", (240, 260), "Unique local IPv6"),
3749 ("10.0.0.1:9000", (240, 250), "Private IPv4 (10.x)"),
3750 ("172.16.0.1:9000", (240, 250), "Private IPv4 (172.16.x)"),
3751 ];
3752
3753 for (addr_str, (min_priority, max_priority), description) in test_cases {
3754 let addr: SocketAddr = addr_str.parse().expect("Failed to parse test address");
3755 manager
3756 .accept_quic_discovered_address(peer_id, addr)
3757 .expect("Failed to accept address in test");
3758
3759 let session = manager
3760 .active_sessions
3761 .get(&peer_id)
3762 .expect("Session should exist in test");
3763 let candidate = session
3764 .discovered_candidates
3765 .iter()
3766 .find(|c| c.address == addr)
3767 .unwrap_or_else(|| panic!("No candidate found for {}", description));
3768
3769 assert!(
3770 candidate.priority >= min_priority && candidate.priority <= max_priority,
3771 "{} priority {} not in range [{}, {}]",
3772 description,
3773 candidate.priority,
3774 min_priority,
3775 max_priority
3776 );
3777 }
3778 }
3779
3780 #[test]
3781 fn test_quic_discovered_priority_factors() {
3782 let manager = create_test_manager();
3784
3785 let base_priority = manager.calculate_quic_discovered_priority(
3787 &"1.2.3.4:5678"
3788 .parse()
3789 .expect("Failed to parse test address"),
3790 );
3791 assert_eq!(
3792 base_priority, 255,
3793 "Base priority should be 255 for public IPv4"
3794 );
3795
3796 let ipv6_priority = manager.calculate_quic_discovered_priority(
3798 &"[2001:db8::1]:5678"
3799 .parse()
3800 .expect("Failed to parse test address"),
3801 );
3802 assert!(
3803 ipv6_priority > base_priority,
3804 "IPv6 should have higher priority than IPv4"
3805 );
3806
3807 let private_priority = manager.calculate_quic_discovered_priority(
3809 &"192.168.1.1:5678"
3810 .parse()
3811 .expect("Failed to parse test address"),
3812 );
3813 assert!(
3814 private_priority < base_priority,
3815 "Private addresses should have lower priority"
3816 );
3817
3818 let link_local_priority = manager.calculate_quic_discovered_priority(
3820 &"[fe80::1]:5678"
3821 .parse()
3822 .expect("Failed to parse test address"),
3823 );
3824 assert!(
3825 link_local_priority < private_priority,
3826 "Link-local should have lower priority than private"
3827 );
3828 }
3829
3830 #[test]
3831 fn test_quic_discovered_addresses_override_stale_server_reflexive() {
3832 let mut manager = create_test_manager();
3834 let peer_id = PeerId([1; 32]);
3835
3836 manager
3838 .start_discovery(peer_id, vec![])
3839 .expect("Failed to start discovery in test");
3840
3841 let session = manager
3843 .active_sessions
3844 .get_mut(&peer_id)
3845 .expect("Session should exist in test");
3846 let old_candidate = DiscoveryCandidate {
3847 address: "1.2.3.4:1234"
3848 .parse()
3849 .expect("Failed to parse test address"),
3850 priority: 200,
3851 source: DiscoverySourceType::ServerReflexive,
3852 state: CandidateState::Validating,
3853 };
3854 session.discovered_candidates.push(old_candidate);
3855
3856 let new_addr = "1.2.3.4:5678"
3858 .parse()
3859 .expect("Failed to parse test address");
3860 manager
3861 .accept_quic_discovered_address(peer_id, new_addr)
3862 .expect("Failed to accept address in test");
3863
3864 let session = manager
3866 .active_sessions
3867 .get(&peer_id)
3868 .expect("Session should exist in test");
3869 let candidates: Vec<_> = session
3870 .discovered_candidates
3871 .iter()
3872 .filter(|c| c.source == DiscoverySourceType::ServerReflexive)
3873 .collect();
3874
3875 assert_eq!(
3876 candidates.len(),
3877 2,
3878 "Should have both old and new candidates"
3879 );
3880
3881 let new_candidate = candidates
3883 .iter()
3884 .find(|c| c.address == new_addr)
3885 .expect("New candidate should be found");
3886 assert_ne!(
3887 new_candidate.priority, 200,
3888 "New candidate should have recalculated priority"
3889 );
3890 }
3891
3892 #[test]
3893 fn test_quic_discovered_address_generates_events() {
3894 let mut manager = create_test_manager();
3896 let peer_id = PeerId([1; 32]);
3897
3898 manager
3900 .start_discovery(peer_id, vec![])
3901 .expect("Failed to start discovery in test");
3902
3903 manager.poll_discovery_progress(peer_id);
3905
3906 let discovered_addr = "8.8.8.8:5000"
3908 .parse()
3909 .expect("Failed to parse test address");
3910 manager
3911 .accept_quic_discovered_address(peer_id, discovered_addr)
3912 .expect("Failed to accept address in test");
3913
3914 let events = manager.poll_discovery_progress(peer_id);
3916
3917 assert!(
3919 !events.is_empty(),
3920 "Should generate events for new QUIC-discovered address"
3921 );
3922
3923 let has_new_candidate = events.iter().any(|e| {
3925 matches!(e,
3926 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3927 if candidate.address == discovered_addr
3928 )
3929 });
3930 assert!(
3931 has_new_candidate,
3932 "Should generate ServerReflexiveCandidateDiscovered event for the discovered address"
3933 );
3934 }
3935
3936 #[test]
3937 fn test_multiple_quic_discovered_addresses_generate_events() {
3938 let mut manager = create_test_manager();
3940 let peer_id = PeerId([1; 32]);
3941
3942 manager
3944 .start_discovery(peer_id, vec![])
3945 .expect("Failed to start discovery in test");
3946
3947 manager.poll_discovery_progress(peer_id);
3949
3950 let addresses = vec![
3952 "8.8.8.8:5000"
3953 .parse()
3954 .expect("Failed to parse test address"),
3955 "1.1.1.1:6000"
3956 .parse()
3957 .expect("Failed to parse test address"),
3958 "[2001:db8::1]:7000"
3959 .parse()
3960 .expect("Failed to parse test address"),
3961 ];
3962
3963 for addr in &addresses {
3964 manager
3965 .accept_quic_discovered_address(peer_id, *addr)
3966 .expect("Failed to accept address in test");
3967 }
3968
3969 let events = manager.poll_discovery_progress(peer_id);
3971
3972 for addr in &addresses {
3974 let has_event = events.iter().any(|e| {
3975 matches!(e,
3976 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3977 if candidate.address == *addr
3978 )
3979 });
3980 assert!(has_event, "Should have event for address {addr}");
3981 }
3982 }
3983
3984 #[test]
3985 fn test_duplicate_quic_discovered_address_no_event() {
3986 let mut manager = create_test_manager();
3988 let peer_id = PeerId([1; 32]);
3989
3990 manager
3992 .start_discovery(peer_id, vec![])
3993 .expect("Failed to start discovery in test");
3994
3995 let discovered_addr = "8.8.8.8:5000"
3997 .parse()
3998 .expect("Failed to parse test address");
3999 manager
4000 .accept_quic_discovered_address(peer_id, discovered_addr)
4001 .expect("Failed to accept address in test");
4002
4003 manager.poll_discovery_progress(peer_id);
4005
4006 manager
4008 .accept_quic_discovered_address(peer_id, discovered_addr)
4009 .expect("Failed to accept address in test");
4010
4011 let events = manager.poll_discovery_progress(peer_id);
4013
4014 let has_duplicate_event = events.iter().any(|e| {
4016 matches!(e,
4017 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4018 if candidate.address == discovered_addr
4019 )
4020 });
4021
4022 assert!(
4023 !has_duplicate_event,
4024 "Should not generate event for duplicate address"
4025 );
4026 }
4027
4028 #[test]
4029 fn test_quic_discovered_address_event_timing() {
4030 let mut manager = create_test_manager();
4032 let peer_id = PeerId([1; 32]);
4033
4034 manager
4036 .start_discovery(peer_id, vec![])
4037 .expect("Failed to start discovery in test");
4038
4039 manager.poll_discovery_progress(peer_id);
4041
4042 let addr1 = "8.8.8.8:5000"
4044 .parse()
4045 .expect("Failed to parse test address");
4046 let addr2 = "1.1.1.1:6000"
4047 .parse()
4048 .expect("Failed to parse test address");
4049
4050 manager
4051 .accept_quic_discovered_address(peer_id, addr1)
4052 .expect("Failed to accept address in test");
4053 manager
4054 .accept_quic_discovered_address(peer_id, addr2)
4055 .expect("Failed to accept address in test");
4056
4057 let events = manager.poll_discovery_progress(peer_id);
4060
4061 let server_reflexive_count = events
4063 .iter()
4064 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4065 .count();
4066
4067 assert!(
4068 server_reflexive_count >= 2,
4069 "Should deliver all queued events on poll, got {server_reflexive_count} events"
4070 );
4071
4072 let events2 = manager.poll_discovery_progress(peer_id);
4074 let server_reflexive_count2 = events2
4075 .iter()
4076 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4077 .count();
4078 assert_eq!(
4079 server_reflexive_count2, 0,
4080 "Server reflexive events should not be duplicated on subsequent polls"
4081 );
4082 }
4083
4084 #[test]
4085 fn test_is_valid_local_address() {
4086 let manager = create_test_manager();
4087
4088 assert!(
4090 manager.is_valid_local_address(
4091 &"192.168.1.1:8080"
4092 .parse()
4093 .expect("Failed to parse test address")
4094 )
4095 );
4096 assert!(
4097 manager.is_valid_local_address(
4098 &"10.0.0.1:8080"
4099 .parse()
4100 .expect("Failed to parse test address")
4101 )
4102 );
4103 assert!(
4104 manager.is_valid_local_address(
4105 &"172.16.0.1:8080"
4106 .parse()
4107 .expect("Failed to parse test address")
4108 )
4109 );
4110
4111 assert!(
4113 manager.is_valid_local_address(
4114 &"[2001:4860:4860::8888]:8080"
4115 .parse()
4116 .expect("Failed to parse test address")
4117 )
4118 );
4119 assert!(
4120 manager.is_valid_local_address(
4121 &"[fe80::1]:8080"
4122 .parse()
4123 .expect("Failed to parse test address")
4124 )
4125 ); assert!(
4127 manager.is_valid_local_address(
4128 &"[fc00::1]:8080"
4129 .parse()
4130 .expect("Failed to parse test address")
4131 )
4132 ); assert!(
4136 !manager.is_valid_local_address(
4137 &"0.0.0.0:8080"
4138 .parse()
4139 .expect("Failed to parse test address")
4140 )
4141 );
4142 assert!(
4143 !manager.is_valid_local_address(
4144 &"255.255.255.255:8080"
4145 .parse()
4146 .expect("Failed to parse test address")
4147 )
4148 );
4149 assert!(
4150 !manager.is_valid_local_address(
4151 &"224.0.0.1:8080"
4152 .parse()
4153 .expect("Failed to parse test address")
4154 )
4155 ); assert!(
4157 !manager.is_valid_local_address(
4158 &"0.0.0.1:8080"
4159 .parse()
4160 .expect("Failed to parse test address")
4161 )
4162 ); assert!(
4164 !manager.is_valid_local_address(
4165 &"240.0.0.1:8080"
4166 .parse()
4167 .expect("Failed to parse test address")
4168 )
4169 ); assert!(
4171 !manager.is_valid_local_address(
4172 &"[::]:8080".parse().expect("Failed to parse test address")
4173 )
4174 ); assert!(
4176 !manager.is_valid_local_address(
4177 &"[ff02::1]:8080"
4178 .parse()
4179 .expect("Failed to parse test address")
4180 )
4181 ); assert!(
4183 !manager.is_valid_local_address(
4184 &"[2001:db8::1]:8080"
4185 .parse()
4186 .expect("Failed to parse test address")
4187 )
4188 ); assert!(
4192 !manager.is_valid_local_address(
4193 &"192.168.1.1:0"
4194 .parse()
4195 .expect("Failed to parse test address")
4196 )
4197 );
4198
4199 #[cfg(test)]
4201 {
4202 assert!(
4203 manager.is_valid_local_address(
4204 &"127.0.0.1:8080"
4205 .parse()
4206 .expect("Failed to parse test address")
4207 )
4208 );
4209 assert!(manager.is_valid_local_address(
4210 &"[::1]:8080".parse().expect("Failed to parse test address")
4211 ));
4212 }
4213 }
4214
4215 #[test]
4216 fn test_validation_rejects_invalid_addresses() {}
4217
4218 #[test]
4219 fn test_candidate_validation_error_types() {
4220 use crate::nat_traversal_api::{CandidateAddress, CandidateValidationError};
4221
4222 assert!(matches!(
4224 CandidateAddress::validate_address(&"192.168.1.1:0".parse().unwrap()),
4225 Err(CandidateValidationError::InvalidPort(0))
4226 ));
4227
4228 assert!(matches!(
4229 CandidateAddress::validate_address(&"0.0.0.0:8080".parse().unwrap()),
4230 Err(CandidateValidationError::UnspecifiedAddress)
4231 ));
4232
4233 assert!(matches!(
4234 CandidateAddress::validate_address(&"255.255.255.255:8080".parse().unwrap()),
4235 Err(CandidateValidationError::BroadcastAddress)
4236 ));
4237
4238 assert!(matches!(
4239 CandidateAddress::validate_address(&"224.0.0.1:8080".parse().unwrap()),
4240 Err(CandidateValidationError::MulticastAddress)
4241 ));
4242
4243 assert!(matches!(
4244 CandidateAddress::validate_address(&"240.0.0.1:8080".parse().unwrap()),
4245 Err(CandidateValidationError::ReservedAddress)
4246 ));
4247
4248 assert!(matches!(
4249 CandidateAddress::validate_address(&"[2001:db8::1]:8080".parse().unwrap()),
4250 Err(CandidateValidationError::DocumentationAddress)
4251 ));
4252
4253 assert!(matches!(
4254 CandidateAddress::validate_address(&"[::ffff:192.168.1.1]:8080".parse().unwrap()),
4255 Err(CandidateValidationError::IPv4MappedAddress)
4256 ));
4257 }
4258}