1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, error, info, warn};
17
18use crate::Connection;
19
20use crate::{
21 connection::nat_traversal::{CandidateSource, CandidateState},
22 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
23};
24
25#[cfg(target_os = "windows")]
27pub mod windows;
28
29#[cfg(target_os = "windows")]
30pub use windows::WindowsInterfaceDiscovery;
31
32#[cfg(target_os = "linux")]
33pub mod linux;
34
35#[cfg(target_os = "linux")]
36pub use linux::LinuxInterfaceDiscovery;
37
38#[cfg(target_os = "macos")]
39pub(crate) mod macos;
40
41#[cfg(target_os = "macos")]
42pub(crate) use macos::MacOSInterfaceDiscovery;
43
44fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
46 match discovery_source {
47 DiscoverySourceType::Local => CandidateSource::Local,
48 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
49 DiscoverySourceType::Predicted => CandidateSource::Predicted,
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum DiscoverySourceType {
56 Local,
57 ServerReflexive,
58 Predicted,
59}
60
61#[derive(Debug, Clone)]
63pub(crate) struct DiscoveryCandidate {
64 pub address: SocketAddr,
65 pub priority: u32,
66 pub source: DiscoverySourceType,
67 pub state: CandidateState,
68}
69
70impl DiscoveryCandidate {
71 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
73 CandidateAddress {
74 address: self.address,
75 priority: self.priority,
76 source: convert_to_nat_source(self.source),
77 state: self.state,
78 }
79 }
80}
81
82#[derive(Debug)]
84#[allow(dead_code)]
85pub struct DiscoverySession {
86 peer_id: PeerId,
88 session_id: u64,
90 current_phase: DiscoveryPhase,
92 started_at: Instant,
94 discovered_candidates: Vec<DiscoveryCandidate>,
96 statistics: DiscoveryStatistics,
98 allocation_history: VecDeque<PortAllocationEvent>,
100 server_reflexive_discovery: ServerReflexiveDiscovery,
102}
103
104#[allow(dead_code)]
106pub struct CandidateDiscoveryManager {
107 config: DiscoveryConfig,
109 interface_discovery: Arc<std::sync::Mutex<Box<dyn NetworkInterfaceDiscovery + Send>>>,
111 symmetric_predictor: Arc<std::sync::Mutex<SymmetricNatPredictor>>,
113 bootstrap_manager: Arc<BootstrapNodeManager>,
115 cache: DiscoveryCache,
117 active_sessions: HashMap<PeerId, DiscoverySession>,
119 cached_local_candidates: Option<(Instant, Vec<ValidatedCandidate>)>,
121 local_cache_duration: Duration,
123 pending_validations: HashMap<CandidateId, PendingValidation>,
125}
126
127#[derive(Debug, Clone)]
129pub struct DiscoveryConfig {
130 pub total_timeout: Duration,
132 pub local_scan_timeout: Duration,
134 pub bootstrap_query_timeout: Duration,
136 pub max_query_retries: u32,
138 pub max_candidates: usize,
140 pub enable_symmetric_prediction: bool,
142 pub min_bootstrap_consensus: usize,
144 pub interface_cache_ttl: Duration,
146 pub server_reflexive_cache_ttl: Duration,
148 pub bound_address: Option<SocketAddr>,
150}
151
152#[derive(Debug, Clone, PartialEq)]
154pub enum DiscoveryPhase {
155 Idle,
157 LocalInterfaceScanning { started_at: Instant },
159 ServerReflexiveQuerying {
161 started_at: Instant,
162 active_queries: HashMap<BootstrapNodeId, QueryState>,
163 responses_received: Vec<ServerReflexiveResponse>,
164 },
165 SymmetricNatPrediction {
167 started_at: Instant,
168 prediction_attempts: u32,
169 pattern_analysis: PatternAnalysisState,
170 },
171 CandidateValidation {
173 started_at: Instant,
174 validation_results: HashMap<CandidateId, ValidationResult>,
175 },
176 Completed {
178 final_candidates: Vec<ValidatedCandidate>,
179 completion_time: Instant,
180 },
181 Failed {
183 error: DiscoveryError,
184 failed_at: Instant,
185 fallback_options: Vec<FallbackStrategy>,
186 },
187}
188
189#[derive(Debug, Clone)]
191pub enum DiscoveryEvent {
192 DiscoveryStarted {
194 peer_id: PeerId,
195 bootstrap_count: usize,
196 },
197 LocalScanningStarted,
199 LocalCandidateDiscovered { candidate: CandidateAddress },
201 LocalScanningCompleted {
203 candidate_count: usize,
204 duration: Duration,
205 },
206 ServerReflexiveDiscoveryStarted { bootstrap_count: usize },
208 ServerReflexiveCandidateDiscovered {
210 candidate: CandidateAddress,
211 bootstrap_node: SocketAddr,
212 },
213 BootstrapQueryFailed {
215 bootstrap_node: SocketAddr,
216 error: String,
217 },
218 SymmetricPredictionStarted { base_address: SocketAddr },
220 PredictedCandidateGenerated {
222 candidate: CandidateAddress,
223 confidence: f64,
224 },
225 PortAllocationDetected {
227 port: u16,
228 source_address: SocketAddr,
229 bootstrap_node: BootstrapNodeId,
230 timestamp: Instant,
231 },
232 DiscoveryCompleted {
234 candidate_count: usize,
235 total_duration: Duration,
236 success_rate: f64,
237 },
238 DiscoveryFailed {
240 error: DiscoveryError,
241 partial_results: Vec<CandidateAddress>,
242 },
243 PathValidationRequested {
245 candidate_id: CandidateId,
246 candidate_address: SocketAddr,
247 challenge_token: u64,
248 },
249 PathValidationResponse {
251 candidate_id: CandidateId,
252 candidate_address: SocketAddr,
253 challenge_token: u64,
254 rtt: Duration,
255 },
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
260pub struct BootstrapNodeId(pub u64);
261
262#[allow(dead_code)]
264struct PendingValidation {
265 candidate_address: SocketAddr,
267 challenge_token: u64,
269 started_at: Instant,
271 attempts: u32,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
277pub enum QueryState {
278 Pending { sent_at: Instant, attempts: u32 },
280 Completed,
282 Failed,
284}
285
286#[derive(Debug, Clone, PartialEq)]
288pub struct ServerReflexiveResponse {
289 pub bootstrap_node: BootstrapNodeId,
290 pub observed_address: SocketAddr,
291 pub response_time: Duration,
292 pub timestamp: Instant,
293}
294
295#[derive(Debug, Clone, PartialEq)]
297pub struct PatternAnalysisState {
298 pub allocation_history: VecDeque<PortAllocationEvent>,
299 pub detected_pattern: Option<PortAllocationPattern>,
300 pub confidence_level: f64,
301 pub prediction_accuracy: f64,
302}
303
304#[derive(Debug, Clone, PartialEq)]
306pub struct PortAllocationEvent {
307 pub port: u16,
308 pub timestamp: Instant,
309 pub source_address: SocketAddr,
310}
311
312#[derive(Debug, Clone, PartialEq)]
314pub struct PortAllocationPattern {
315 pub pattern_type: AllocationPatternType,
316 pub base_port: u16,
317 pub stride: u16,
318 pub pool_boundaries: Option<(u16, u16)>,
319 pub confidence: f64,
320}
321
322#[derive(Debug, Clone, PartialEq, Eq)]
324pub enum AllocationPatternType {
325 Sequential,
327 FixedStride,
329 Random,
331 PoolBased,
333 TimeBased,
335 Unknown,
337}
338
339#[derive(Debug, Clone)]
341pub struct PortPatternAnalysis {
342 pub pattern: PortAllocationPattern,
344 pub increment: Option<i32>,
346 pub base_port: u16,
348}
349
350#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
352pub struct CandidateId(pub u64);
353
354#[derive(Debug, Clone, PartialEq)]
356pub enum ValidationResult {
357 Valid { rtt: Duration },
358 Invalid { reason: String },
359 Timeout,
360 Pending,
361}
362
363#[derive(Debug, Clone, PartialEq)]
365pub struct ValidatedCandidate {
366 pub id: CandidateId,
367 pub address: SocketAddr,
368 pub source: DiscoverySourceType,
369 pub priority: u32,
370 pub rtt: Option<Duration>,
371 pub reliability_score: f64,
372}
373
374impl ValidatedCandidate {
375 pub fn to_candidate_address(&self) -> CandidateAddress {
377 CandidateAddress {
378 address: self.address,
379 priority: self.priority,
380 source: convert_to_nat_source(self.source),
381 state: CandidateState::Valid,
382 }
383 }
384}
385
386#[derive(Debug)]
388#[allow(dead_code)]
389pub(crate) struct DiscoverySessionState {
390 pub peer_id: PeerId,
391 pub session_id: u64,
392 pub started_at: Instant,
393 pub discovered_candidates: Vec<DiscoveryCandidate>,
394 pub statistics: DiscoveryStatistics,
395 pub allocation_history: VecDeque<PortAllocationEvent>,
396}
397
398#[derive(Debug, Default, Clone)]
400pub struct DiscoveryStatistics {
401 pub local_candidates_found: u32,
402 pub server_reflexive_candidates_found: u32,
403 pub predicted_candidates_generated: u32,
404 pub bootstrap_queries_sent: u32,
405 pub bootstrap_queries_successful: u32,
406 pub total_discovery_time: Option<Duration>,
407 pub average_bootstrap_rtt: Option<Duration>,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq)]
412pub enum DiscoveryError {
413 NoLocalInterfaces,
415 AllBootstrapsFailed,
417 DiscoveryTimeout,
419 InsufficientCandidates { found: usize, required: usize },
421 NetworkError(String),
423 ConfigurationError(String),
425 InternalError(String),
427}
428
429#[derive(Debug, Clone, PartialEq, Eq)]
431pub enum FallbackStrategy {
432 UseCachedResults,
434 RetryWithRelaxedParams,
436 UseMinimalCandidates,
438 EnableRelayFallback,
440}
441
442impl Default for DiscoveryConfig {
443 fn default() -> Self {
444 Self {
445 total_timeout: Duration::from_secs(30),
446 local_scan_timeout: Duration::from_secs(2),
447 bootstrap_query_timeout: Duration::from_secs(5),
448 max_query_retries: 3,
449 max_candidates: 8,
450 enable_symmetric_prediction: true,
451 min_bootstrap_consensus: 2,
452 interface_cache_ttl: Duration::from_secs(60),
453 server_reflexive_cache_ttl: Duration::from_secs(300),
454 bound_address: None,
455 }
456 }
457}
458
459impl DiscoverySession {
460 fn new(peer_id: PeerId, config: &DiscoveryConfig) -> Self {
462 Self {
463 peer_id,
464 session_id: rand::random(),
465 current_phase: DiscoveryPhase::Idle,
466 started_at: Instant::now(),
467 discovered_candidates: Vec::new(),
468 statistics: DiscoveryStatistics::default(),
469 allocation_history: VecDeque::new(),
470 server_reflexive_discovery: ServerReflexiveDiscovery::new(config),
471 }
472 }
473}
474
475#[allow(dead_code)]
476impl CandidateDiscoveryManager {
477 pub fn new(config: DiscoveryConfig) -> Self {
479 let interface_discovery =
480 Arc::new(std::sync::Mutex::new(create_platform_interface_discovery()));
481 let symmetric_predictor =
482 Arc::new(std::sync::Mutex::new(SymmetricNatPredictor::new(&config)));
483 let bootstrap_manager = Arc::new(BootstrapNodeManager::new(&config));
484 let cache = DiscoveryCache::new(&config);
485 let local_cache_duration = config.interface_cache_ttl;
486
487 Self {
488 config,
489 interface_discovery,
490 symmetric_predictor,
491 bootstrap_manager,
492 cache,
493 active_sessions: HashMap::new(),
494 cached_local_candidates: None,
495 local_cache_duration,
496 pending_validations: HashMap::new(),
497 }
498 }
499
500 pub fn set_bound_address(&mut self, address: SocketAddr) {
502 self.config.bound_address = Some(address);
503 self.cached_local_candidates = None;
505 }
506
507 pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
509 self.interface_discovery
511 .lock()
512 .unwrap()
513 .start_scan()
514 .map_err(|e| {
515 DiscoveryError::NetworkError(format!("Failed to start interface scan: {}", e))
516 })?;
517
518 let start = Instant::now();
520 let timeout = Duration::from_secs(2);
521
522 loop {
523 if start.elapsed() > timeout {
524 return Err(DiscoveryError::DiscoveryTimeout);
525 }
526
527 if let Some(interfaces) = self
528 .interface_discovery
529 .lock()
530 .unwrap()
531 .check_scan_complete()
532 {
533 let mut candidates = Vec::new();
535
536 for interface in interfaces {
537 for addr in interface.addresses {
538 candidates.push(ValidatedCandidate {
539 id: CandidateId(rand::random()),
540 address: addr,
541 source: DiscoverySourceType::Local,
542 priority: 50000, rtt: None,
544 reliability_score: 1.0,
545 });
546 }
547 }
548
549 if candidates.is_empty() {
550 return Err(DiscoveryError::NoLocalInterfaces);
551 }
552
553 return Ok(candidates);
554 }
555
556 std::thread::sleep(Duration::from_millis(10));
558 }
559 }
560
561 pub fn start_discovery(
563 &mut self,
564 peer_id: PeerId,
565 _bootstrap_nodes: Vec<BootstrapNode>,
566 ) -> Result<(), DiscoveryError> {
567 if self.active_sessions.contains_key(&peer_id) {
569 return Err(DiscoveryError::InternalError(format!(
570 "Discovery already in progress for peer {:?}",
571 peer_id
572 )));
573 }
574
575 info!("Starting candidate discovery for peer {:?}", peer_id);
576
577 let mut session = DiscoverySession::new(peer_id, &self.config);
579
580 session.current_phase = DiscoveryPhase::LocalInterfaceScanning {
585 started_at: Instant::now(),
586 };
587
588 self.active_sessions.insert(peer_id, session);
590
591 Ok(())
592 }
593
594 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
596 let mut all_events = Vec::new();
597 let mut completed_sessions = Vec::new();
598
599 let mut local_scan_events = Vec::new();
602 for (peer_id, session) in &mut self.active_sessions {
603 match &session.current_phase {
604 DiscoveryPhase::LocalInterfaceScanning { started_at } => {
605 if started_at.elapsed() > self.config.local_scan_timeout {
607 local_scan_events.push((
608 *peer_id,
609 DiscoveryEvent::LocalScanningCompleted {
610 candidate_count: 0,
611 duration: started_at.elapsed(),
612 },
613 ));
614 }
615 }
616 _ => {}
617 }
618 }
619
620 for (peer_id, event) in local_scan_events {
622 all_events.push(event);
623 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
624 session.current_phase = DiscoveryPhase::Completed {
626 final_candidates: session
627 .discovered_candidates
628 .iter()
629 .map(|dc| ValidatedCandidate {
630 id: CandidateId(0),
631 address: dc.address,
632 source: dc.source,
633 priority: dc.priority,
634 rtt: None,
635 reliability_score: 1.0,
636 })
637 .collect(),
638 completion_time: now,
639 };
640
641 all_events.push(DiscoveryEvent::DiscoveryCompleted {
642 candidate_count: session.discovered_candidates.len(),
643 total_duration: now.duration_since(session.started_at),
644 success_rate: 1.0,
645 });
646
647 completed_sessions.push(peer_id);
648 }
649 }
650
651 for peer_id in completed_sessions {
653 self.active_sessions.remove(&peer_id);
654 debug!("Removed completed discovery session for peer {:?}", peer_id);
655 }
656
657 all_events
658 }
659
660 pub fn get_status(&self) -> DiscoveryStatus {
662 DiscoveryStatus {
664 phase: DiscoveryPhase::Idle,
665 discovered_candidates: Vec::new(),
666 statistics: DiscoveryStatistics::default(),
667 elapsed_time: Duration::from_secs(0),
668 }
669 }
670
671 pub fn is_complete(&self) -> bool {
673 self.active_sessions.values().all(|session| {
675 matches!(
676 session.current_phase,
677 DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. }
678 )
679 })
680 }
681
682 pub fn get_results(&self) -> Option<DiscoveryResults> {
684 if self.active_sessions.is_empty() {
686 return None;
687 }
688
689 let mut all_candidates = Vec::new();
691 let mut latest_completion = Instant::now();
692 let mut combined_stats = DiscoveryStatistics::default();
693
694 for session in self.active_sessions.values() {
695 match &session.current_phase {
696 DiscoveryPhase::Completed {
697 final_candidates,
698 completion_time,
699 } => {
700 all_candidates.extend(final_candidates.clone());
702 latest_completion = *completion_time;
703 combined_stats.local_candidates_found +=
705 session.statistics.local_candidates_found;
706 combined_stats.server_reflexive_candidates_found +=
707 session.statistics.server_reflexive_candidates_found;
708 combined_stats.predicted_candidates_generated +=
709 session.statistics.predicted_candidates_generated;
710 combined_stats.bootstrap_queries_sent +=
711 session.statistics.bootstrap_queries_sent;
712 combined_stats.bootstrap_queries_successful +=
713 session.statistics.bootstrap_queries_successful;
714 }
715 DiscoveryPhase::Failed { .. } => {
716 let validated: Vec<ValidatedCandidate> = session
719 .discovered_candidates
720 .iter()
721 .enumerate()
722 .map(|(idx, dc)| ValidatedCandidate {
723 id: CandidateId(idx as u64),
724 address: dc.address,
725 source: dc.source,
726 priority: dc.priority,
727 rtt: None,
728 reliability_score: 0.5, })
730 .collect();
731 all_candidates.extend(validated);
732 }
733 _ => {}
734 }
735 }
736
737 if all_candidates.is_empty() {
738 None
739 } else {
740 Some(DiscoveryResults {
741 candidates: all_candidates,
742 completion_time: latest_completion,
743 statistics: combined_stats,
744 })
745 }
746 }
747
748 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
750 if let Some(session) = self.active_sessions.get(&peer_id) {
752 session
754 .discovered_candidates
755 .iter()
756 .map(|c| c.to_candidate_address())
757 .collect()
758 } else {
759 debug!("No active discovery session found for peer {:?}", peer_id);
761 Vec::new()
762 }
763 }
764
765 fn poll_session_local_scanning(
768 &mut self,
769 session: &mut DiscoverySession,
770 started_at: Instant,
771 now: Instant,
772 events: &mut Vec<DiscoveryEvent>,
773 ) {
774 if let Some((cache_time, ref cached_candidates)) = self.cached_local_candidates {
776 if cache_time.elapsed() < self.local_cache_duration {
777 debug!(
779 "Using cached local candidates for peer {:?}",
780 session.peer_id
781 );
782 self.process_cached_local_candidates(
783 session,
784 cached_candidates.clone(),
785 events,
786 now,
787 );
788 return;
789 }
790 }
791
792 if started_at.elapsed().as_millis() < 10 {
795 let scan_result = self.interface_discovery.lock().unwrap().start_scan();
796 match scan_result {
797 Ok(()) => {
798 debug!(
799 "Started local interface scan for peer {:?}",
800 session.peer_id
801 );
802 events.push(DiscoveryEvent::LocalScanningStarted);
803 }
804 Err(e) => {
805 error!("Failed to start interface scan: {}", e);
806 self.handle_session_local_scan_timeout(session, events, now);
807 return;
808 }
809 }
810 }
811
812 if started_at.elapsed() > self.config.local_scan_timeout {
814 warn!(
815 "Local interface scanning timeout for peer {:?}",
816 session.peer_id
817 );
818 self.handle_session_local_scan_timeout(session, events, now);
819 return;
820 }
821
822 let scan_complete_result = self
824 .interface_discovery
825 .lock()
826 .unwrap()
827 .check_scan_complete();
828 if let Some(interfaces) = scan_complete_result {
829 self.process_session_local_interfaces(session, interfaces, events, now);
830 }
831 }
832
833 fn process_session_local_interfaces(
834 &mut self,
835 session: &mut DiscoverySession,
836 interfaces: Vec<NetworkInterface>,
837 events: &mut Vec<DiscoveryEvent>,
838 now: Instant,
839 ) {
840 debug!(
841 "Processing {} network interfaces for peer {:?}",
842 interfaces.len(),
843 session.peer_id
844 );
845
846 let mut validated_candidates = Vec::new();
847
848 if let Some(bound_addr) = self.config.bound_address {
850 if self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback() {
851 let candidate = DiscoveryCandidate {
852 address: bound_addr,
853 priority: 60000, source: DiscoverySourceType::Local,
855 state: CandidateState::New,
856 };
857
858 session.discovered_candidates.push(candidate.clone());
859 session.statistics.local_candidates_found += 1;
860
861 validated_candidates.push(ValidatedCandidate {
863 id: CandidateId(rand::random()),
864 address: bound_addr,
865 source: DiscoverySourceType::Local,
866 priority: candidate.priority,
867 rtt: None,
868 reliability_score: 1.0,
869 });
870
871 events.push(DiscoveryEvent::LocalCandidateDiscovered {
872 candidate: candidate.to_candidate_address(),
873 });
874
875 debug!(
876 "Added bound address {} as local candidate for peer {:?}",
877 bound_addr, session.peer_id
878 );
879 }
880 }
881
882 for interface in &interfaces {
884 for address in &interface.addresses {
885 if Some(*address) == self.config.bound_address {
887 continue;
888 }
889
890 if self.is_valid_local_address(&address) {
891 let candidate = DiscoveryCandidate {
892 address: *address,
893 priority: self.calculate_local_priority(address, &interface),
894 source: DiscoverySourceType::Local,
895 state: CandidateState::New,
896 };
897
898 session.discovered_candidates.push(candidate.clone());
899 session.statistics.local_candidates_found += 1;
900
901 validated_candidates.push(ValidatedCandidate {
903 id: CandidateId(rand::random()),
904 address: *address,
905 source: DiscoverySourceType::Local,
906 priority: candidate.priority,
907 rtt: None,
908 reliability_score: 1.0,
909 });
910
911 events.push(DiscoveryEvent::LocalCandidateDiscovered {
912 candidate: candidate.to_candidate_address(),
913 });
914 }
915 }
916 }
917
918 self.cached_local_candidates = Some((now, validated_candidates));
920
921 events.push(DiscoveryEvent::LocalScanningCompleted {
922 candidate_count: session.statistics.local_candidates_found as usize,
923 duration: now.duration_since(session.started_at),
924 });
925
926 self.start_session_server_reflexive_discovery(session, events, now);
928 }
929
930 fn process_cached_local_candidates(
931 &mut self,
932 session: &mut DiscoverySession,
933 mut cached_candidates: Vec<ValidatedCandidate>,
934 events: &mut Vec<DiscoveryEvent>,
935 now: Instant,
936 ) {
937 if let Some(bound_addr) = self.config.bound_address {
939 let has_bound_addr = cached_candidates.iter().any(|c| c.address == bound_addr);
940 if !has_bound_addr
941 && (self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback())
942 {
943 cached_candidates.insert(
944 0,
945 ValidatedCandidate {
946 id: CandidateId(rand::random()),
947 address: bound_addr,
948 source: DiscoverySourceType::Local,
949 priority: 60000, rtt: None,
951 reliability_score: 1.0,
952 },
953 );
954 }
955 }
956
957 debug!(
958 "Using {} cached local candidates for peer {:?}",
959 cached_candidates.len(),
960 session.peer_id
961 );
962
963 for validated in cached_candidates {
964 let candidate = DiscoveryCandidate {
965 address: validated.address,
966 priority: validated.priority,
967 source: validated.source.clone(),
968 state: CandidateState::New,
969 };
970
971 session.discovered_candidates.push(candidate.clone());
972 session.statistics.local_candidates_found += 1;
973
974 events.push(DiscoveryEvent::LocalCandidateDiscovered {
975 candidate: candidate.to_candidate_address(),
976 });
977 }
978
979 events.push(DiscoveryEvent::LocalScanningCompleted {
980 candidate_count: session.statistics.local_candidates_found as usize,
981 duration: now.duration_since(session.started_at),
982 });
983
984 self.start_session_server_reflexive_discovery(session, events, now);
986 }
987
988 fn start_session_server_reflexive_discovery(
989 &mut self,
990 session: &mut DiscoverySession,
991 events: &mut Vec<DiscoveryEvent>,
992 now: Instant,
993 ) {
994 let has_quic_discovered = session.discovered_candidates.iter()
996 .any(|c| c.source == DiscoverySourceType::ServerReflexive);
997
998 if has_quic_discovered {
999 info!(
1000 "Skipping server reflexive discovery for peer {:?}, using QUIC-discovered addresses",
1001 session.peer_id
1002 );
1003 self.complete_session_discovery_with_local_candidates(session, events, now);
1005 return;
1006 }
1007
1008 let bootstrap_node_ids = self.bootstrap_manager.get_active_bootstrap_nodes();
1009
1010 if bootstrap_node_ids.is_empty() {
1011 info!(
1012 "No bootstrap nodes available for server reflexive discovery for peer {:?}, completing with local candidates only",
1013 session.peer_id
1014 );
1015 self.complete_session_discovery_with_local_candidates(session, events, now);
1017 return;
1018 }
1019
1020 let bootstrap_nodes_with_addresses: Vec<(BootstrapNodeId, SocketAddr)> = bootstrap_node_ids
1022 .iter()
1023 .filter_map(|&node_id| {
1024 self.bootstrap_manager
1025 .get_bootstrap_address(node_id)
1026 .map(|addr| (node_id, addr))
1027 })
1028 .collect();
1029
1030 if bootstrap_nodes_with_addresses.is_empty() {
1031 warn!("No bootstrap node addresses available for server reflexive discovery");
1032 self.complete_session_discovery_with_local_candidates(session, events, now);
1034 return;
1035 }
1036
1037 let active_queries = session
1039 .server_reflexive_discovery
1040 .start_queries_with_addresses(&bootstrap_nodes_with_addresses, now);
1041
1042 events.push(DiscoveryEvent::ServerReflexiveDiscoveryStarted {
1043 bootstrap_count: bootstrap_nodes_with_addresses.len(),
1044 });
1045
1046 session.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
1047 started_at: now,
1048 active_queries,
1049 responses_received: Vec::new(),
1050 };
1051 }
1052
1053 fn process_server_reflexive_response_for_session(
1054 &mut self,
1055 session: &mut DiscoverySession,
1056 response: &ServerReflexiveResponse,
1057 events: &mut Vec<DiscoveryEvent>,
1058 ) {
1059 debug!("Received server reflexive response: {:?}", response);
1060
1061 let allocation_event = PortAllocationEvent {
1063 port: response.observed_address.port(),
1064 timestamp: response.timestamp,
1065 source_address: response.observed_address,
1066 };
1067
1068 if let DiscoveryPhase::ServerReflexiveQuerying { .. } = &mut session.current_phase {
1070 session
1073 .allocation_history
1074 .push_back(allocation_event.clone());
1075
1076 if session.allocation_history.len() > 20 {
1078 session.allocation_history.pop_front();
1079 }
1080 }
1081
1082 let candidate = DiscoveryCandidate {
1083 address: response.observed_address,
1084 priority: self.calculate_server_reflexive_priority(response),
1085 source: DiscoverySourceType::ServerReflexive,
1086 state: CandidateState::New,
1087 };
1088
1089 session.discovered_candidates.push(candidate.clone());
1090 session.statistics.server_reflexive_candidates_found += 1;
1091
1092 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1093 candidate: candidate.to_candidate_address(),
1094 bootstrap_node: self
1095 .bootstrap_manager
1096 .get_bootstrap_address(response.bootstrap_node)
1097 .unwrap_or_else(|| "unknown".parse().unwrap()),
1098 });
1099
1100 events.push(DiscoveryEvent::PortAllocationDetected {
1101 port: allocation_event.port,
1102 source_address: allocation_event.source_address,
1103 bootstrap_node: response.bootstrap_node,
1104 timestamp: allocation_event.timestamp,
1105 });
1106 }
1107
1108 fn start_session_symmetric_prediction(
1109 &mut self,
1110 session: &mut DiscoverySession,
1111 responses: &[ServerReflexiveResponse],
1112 events: &mut Vec<DiscoveryEvent>,
1113 now: Instant,
1114 ) {
1115 if !self.config.enable_symmetric_prediction || responses.is_empty() {
1116 self.complete_session_discovery_with_local_candidates(session, events, now);
1118 return;
1119 }
1120
1121 let base_address = self.calculate_consensus_address(responses);
1123
1124 events.push(DiscoveryEvent::SymmetricPredictionStarted { base_address });
1125
1126 let detected_pattern = self
1128 .symmetric_predictor
1129 .lock()
1130 .unwrap()
1131 .analyze_allocation_patterns(&session.allocation_history);
1132
1133 let confidence_level = detected_pattern
1134 .as_ref()
1135 .map(|p| p.confidence)
1136 .unwrap_or(0.0);
1137
1138 let prediction_accuracy = if let Some(ref pattern) = detected_pattern {
1140 self.calculate_prediction_accuracy(pattern, &session.allocation_history)
1141 } else {
1142 0.3 };
1144
1145 debug!(
1146 "Symmetric NAT pattern analysis: detected_pattern={:?}, confidence={:.2}, accuracy={:.2}",
1147 detected_pattern, confidence_level, prediction_accuracy
1148 );
1149
1150 session.current_phase = DiscoveryPhase::SymmetricNatPrediction {
1151 started_at: now,
1152 prediction_attempts: 0,
1153 pattern_analysis: PatternAnalysisState {
1154 allocation_history: session.allocation_history.clone(),
1155 detected_pattern,
1156 confidence_level,
1157 prediction_accuracy,
1158 },
1159 };
1160 }
1161
1162 fn start_session_candidate_validation(
1163 &mut self,
1164 session: &mut DiscoverySession,
1165 _events: &mut Vec<DiscoveryEvent>,
1166 now: Instant,
1167 ) {
1168 debug!(
1169 "Starting candidate validation for {} candidates",
1170 session.discovered_candidates.len()
1171 );
1172
1173 session.current_phase = DiscoveryPhase::CandidateValidation {
1174 started_at: now,
1175 validation_results: HashMap::new(),
1176 };
1177 }
1178
1179 fn start_path_validation(
1181 &mut self,
1182 candidate_id: CandidateId,
1183 candidate_address: SocketAddr,
1184 now: Instant,
1185 events: &mut Vec<DiscoveryEvent>,
1186 ) {
1187 debug!(
1188 "Starting QUIC path validation for candidate {} at {}",
1189 candidate_id.0, candidate_address
1190 );
1191
1192 let challenge_token: u64 = rand::random();
1194
1195 self.pending_validations.insert(
1197 candidate_id,
1198 PendingValidation {
1199 candidate_address,
1200 challenge_token,
1201 started_at: now,
1202 attempts: 1,
1203 },
1204 );
1205
1206 events.push(DiscoveryEvent::PathValidationRequested {
1208 candidate_id,
1209 candidate_address,
1210 challenge_token,
1211 });
1212
1213 debug!(
1214 "PATH_CHALLENGE {:08x} requested for candidate {} at {}",
1215 challenge_token, candidate_id.0, candidate_address
1216 );
1217 }
1218
1219 pub fn handle_path_response(
1221 &mut self,
1222 candidate_address: SocketAddr,
1223 challenge_token: u64,
1224 now: Instant,
1225 ) -> Option<DiscoveryEvent> {
1226 let candidate_id = self
1228 .pending_validations
1229 .iter()
1230 .find(|(_, validation)| {
1231 validation.candidate_address == candidate_address
1232 && validation.challenge_token == challenge_token
1233 })
1234 .map(|(id, _)| *id)?;
1235
1236 let validation = self.pending_validations.remove(&candidate_id)?;
1238 let rtt = now.duration_since(validation.started_at);
1239
1240 debug!(
1241 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1242 candidate_id.0, candidate_address, rtt
1243 );
1244
1245 for session in self.active_sessions.values_mut() {
1247 if let Some(candidate) = session
1248 .discovered_candidates
1249 .iter_mut()
1250 .find(|c| c.address == candidate_address)
1251 {
1252 candidate.state = CandidateState::Valid;
1253 break;
1255 }
1256 }
1257
1258 Some(DiscoveryEvent::PathValidationResponse {
1259 candidate_id,
1260 candidate_address,
1261 challenge_token,
1262 rtt,
1263 })
1264 }
1265
1266 fn simulate_path_validation(
1268 &mut self,
1269 candidate_id: CandidateId,
1270 candidate_address: SocketAddr,
1271 _now: Instant,
1272 ) {
1273 let is_local = candidate_address.ip().is_loopback()
1275 || (candidate_address.ip().is_ipv4()
1276 && candidate_address.ip().to_string().starts_with("192.168."))
1277 || (candidate_address.ip().is_ipv4()
1278 && candidate_address.ip().to_string().starts_with("10."))
1279 || (candidate_address.ip().is_ipv4()
1280 && candidate_address.ip().to_string().starts_with("172."));
1281
1282 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
1283
1284 debug!(
1287 "Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
1288 candidate_id.0, candidate_address, is_local, is_server_reflexive
1289 );
1290 }
1291
1292 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
1294 let is_local = address.ip().is_loopback()
1295 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168."))
1296 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("10."))
1297 || (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
1298
1299 if is_local {
1300 ValidationResult::Valid {
1302 rtt: Duration::from_millis(1),
1303 }
1304 } else if address.ip().is_unspecified() {
1305 ValidationResult::Invalid {
1307 reason: "Unspecified address".to_string(),
1308 }
1309 } else {
1310 ValidationResult::Valid {
1312 rtt: Duration::from_millis(50 + (address.port() % 100) as u64),
1313 }
1314 }
1315 }
1316
1317 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1319 let mut score: f64 = 0.5; match candidate.source {
1323 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
1327
1328 let rtt_ms = rtt.as_millis() as f64;
1330 if rtt_ms < 10.0 {
1331 score += 0.2;
1332 } else if rtt_ms < 50.0 {
1333 score += 0.1;
1334 } else if rtt_ms > 200.0 {
1335 score -= 0.1;
1336 }
1337
1338 if candidate.address.ip().is_ipv6() {
1340 score += 0.05; }
1342
1343 score.max(0.0).min(1.0)
1345 }
1346
1347 fn handle_session_timeout(
1350 &mut self,
1351 session: &mut DiscoverySession,
1352 events: &mut Vec<DiscoveryEvent>,
1353 now: Instant,
1354 ) {
1355 let error = DiscoveryError::DiscoveryTimeout;
1356 let partial_results = session
1357 .discovered_candidates
1358 .iter()
1359 .map(|c| c.to_candidate_address())
1360 .collect();
1361
1362 warn!(
1363 "Discovery failed for peer {:?}: discovery process timed out (found {} partial candidates)",
1364 session.peer_id,
1365 session.discovered_candidates.len()
1366 );
1367 events.push(DiscoveryEvent::DiscoveryFailed {
1368 error: error.clone(),
1369 partial_results,
1370 });
1371
1372 session.current_phase = DiscoveryPhase::Failed {
1373 error,
1374 failed_at: now,
1375 fallback_options: vec![FallbackStrategy::UseCachedResults],
1376 };
1377 }
1378
1379 fn handle_session_local_scan_timeout(
1380 &mut self,
1381 session: &mut DiscoverySession,
1382 events: &mut Vec<DiscoveryEvent>,
1383 now: Instant,
1384 ) {
1385 warn!(
1386 "Local interface scan timeout for peer {:?}, proceeding with available candidates",
1387 session.peer_id
1388 );
1389
1390 events.push(DiscoveryEvent::LocalScanningCompleted {
1391 candidate_count: session.statistics.local_candidates_found as usize,
1392 duration: now.duration_since(session.started_at),
1393 });
1394
1395 self.start_session_server_reflexive_discovery(session, events, now);
1396 }
1397
1398 fn poll_session_server_reflexive(
1399 &mut self,
1400 session: &mut DiscoverySession,
1401 _started_at: Instant,
1402 _active_queries: &HashMap<BootstrapNodeId, QueryState>,
1403 _responses_received: &[(BootstrapNodeId, ServerReflexiveResponse)],
1404 now: Instant,
1405 events: &mut Vec<DiscoveryEvent>,
1406 ) {
1407 let has_quic_discovered = session.discovered_candidates.iter()
1409 .any(|c| c.source == DiscoverySourceType::ServerReflexive);
1410
1411 if has_quic_discovered {
1412 self.complete_session_discovery_with_local_candidates(session, events, now);
1414 return;
1415 }
1416
1417 self.complete_session_discovery_with_local_candidates(session, events, now);
1420 }
1421
1422 fn poll_session_symmetric_prediction(
1423 &mut self,
1424 session: &mut DiscoverySession,
1425 _started_at: Instant,
1426 _prediction_attempts: u32,
1427 _pattern_analysis: &PatternAnalysisState,
1428 now: Instant,
1429 events: &mut Vec<DiscoveryEvent>,
1430 ) {
1431 self.complete_session_discovery_with_local_candidates(session, events, now);
1434 }
1435
1436 fn poll_session_candidate_validation(
1437 &mut self,
1438 session: &mut DiscoverySession,
1439 _started_at: Instant,
1440 _validation_results: &HashMap<CandidateId, ValidationResult>,
1441 now: Instant,
1442 events: &mut Vec<DiscoveryEvent>,
1443 ) {
1444 self.complete_session_discovery_with_local_candidates(session, events, now);
1447 }
1448
1449 fn complete_session_discovery_with_local_candidates(
1450 &mut self,
1451 session: &mut DiscoverySession,
1452 events: &mut Vec<DiscoveryEvent>,
1453 now: Instant,
1454 ) {
1455 let duration = now.duration_since(session.started_at);
1457 session.statistics.total_discovery_time = Some(duration);
1458
1459 let success_rate = if session.statistics.local_candidates_found > 0 {
1460 1.0
1461 } else {
1462 0.0
1463 };
1464
1465 let validated_candidates: Vec<ValidatedCandidate> = session
1467 .discovered_candidates
1468 .iter()
1469 .map(|dc| ValidatedCandidate {
1470 id: CandidateId(rand::random()),
1471 address: dc.address,
1472 source: dc.source.clone(),
1473 priority: dc.priority,
1474 rtt: None,
1475 reliability_score: 1.0,
1476 })
1477 .collect();
1478
1479 events.push(DiscoveryEvent::DiscoveryCompleted {
1480 candidate_count: validated_candidates.len(),
1481 total_duration: duration,
1482 success_rate,
1483 });
1484
1485 session.current_phase = DiscoveryPhase::Completed {
1486 final_candidates: validated_candidates,
1487 completion_time: now,
1488 };
1489
1490 info!(
1491 "Discovery completed with {} local candidates for peer {:?}",
1492 session.discovered_candidates.len(),
1493 session.peer_id
1494 );
1495 }
1496
1497 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1498 match address.ip() {
1499 IpAddr::V4(ipv4) => {
1500 #[cfg(test)]
1502 if ipv4.is_loopback() {
1503 return true;
1504 }
1505 !ipv4.is_loopback() && !ipv4.is_unspecified()
1506 }
1507 IpAddr::V6(ipv6) => {
1508 #[cfg(test)]
1510 if ipv6.is_loopback() {
1511 return true;
1512 }
1513 !ipv6.is_loopback() && !ipv6.is_unspecified()
1514 }
1515 }
1516 }
1517
1518 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1519 let mut priority = 100; match address.ip() {
1522 IpAddr::V4(ipv4) => {
1523 if ipv4.is_private() {
1524 priority += 50; }
1526 }
1527 IpAddr::V6(ipv6) => {
1528 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1531 let segments = ipv6.segments();
1532 if segments[0] & 0xE000 == 0x2000 {
1533 priority += 60;
1535 } else if segments[0] & 0xFFC0 == 0xFE80 {
1536 priority += 20;
1538 } else if segments[0] & 0xFE00 == 0xFC00 {
1539 priority += 40;
1541 } else {
1542 priority += 30;
1544 }
1545 }
1546
1547 priority += 10; }
1550 }
1551
1552 if interface.is_wireless {
1553 priority -= 10; }
1555
1556 priority
1557 }
1558
1559 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1560 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1564 priority += 20;
1565 } else if response.response_time > Duration::from_millis(200) {
1566 priority -= 10;
1567 }
1568
1569 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 {
1571 20
1572 } else {
1573 0
1574 };
1575 priority += age_bonus;
1576
1577 priority
1578 }
1579
1580 fn should_transition_to_prediction(
1581 &self,
1582 responses: &[ServerReflexiveResponse],
1583 _now: Instant,
1584 ) -> bool {
1585 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1586 }
1587
1588 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1589 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1591
1592 for response in responses {
1593 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1594 }
1595
1596 address_counts
1597 .into_iter()
1598 .max_by_key(|(_, count)| *count)
1599 .map(|(addr, _)| addr)
1600 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
1601 }
1602
1603 fn calculate_prediction_accuracy(
1605 &self,
1606 pattern: &PortAllocationPattern,
1607 history: &VecDeque<PortAllocationEvent>,
1608 ) -> f64 {
1609 if history.len() < 3 {
1610 return 0.3; }
1612
1613 let recent_ports: Vec<u16> = history
1615 .iter()
1616 .rev()
1617 .take(10)
1618 .map(|event| event.port)
1619 .collect();
1620
1621 let mut correct_predictions = 0;
1622 let total_predictions = recent_ports.len().saturating_sub(1);
1623
1624 if total_predictions == 0 {
1625 return 0.3;
1626 }
1627
1628 match pattern.pattern_type {
1629 AllocationPatternType::Sequential => {
1630 for i in 1..recent_ports.len() {
1632 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == 1 {
1633 correct_predictions += 1;
1634 }
1635 }
1636 }
1637 AllocationPatternType::FixedStride => {
1638 for i in 1..recent_ports.len() {
1640 if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == pattern.stride {
1641 correct_predictions += 1;
1642 }
1643 }
1644 }
1645 AllocationPatternType::PoolBased => {
1646 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1648 for port in &recent_ports {
1649 if *port >= min_port && *port <= max_port {
1650 correct_predictions += 1;
1651 }
1652 }
1653 }
1654 }
1655 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1656 if recent_ports.len() >= 3 {
1658 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>()
1659 / recent_ports.len() as f64;
1660 let variance = recent_ports
1661 .iter()
1662 .map(|&p| (p as f64 - mean).powi(2))
1663 .sum::<f64>()
1664 / recent_ports.len() as f64;
1665
1666 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1670 }
1671 AllocationPatternType::TimeBased => {
1672 if history.len() >= 2 {
1674 let time_diffs: Vec<Duration> = history
1675 .iter()
1676 .collect::<Vec<_>>()
1677 .windows(2)
1678 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1679 .collect();
1680
1681 if !time_diffs.is_empty() {
1682 let avg_diff =
1683 time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1684 let variance = time_diffs
1685 .iter()
1686 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1687 .sum::<f64>()
1688 / time_diffs.len() as f64;
1689
1690 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1694 }
1695 }
1696 }
1697
1698 let accuracy = if total_predictions > 0 {
1700 correct_predictions as f64 / total_predictions as f64
1701 } else {
1702 0.3
1703 };
1704
1705 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1707
1708 confidence_adjusted_accuracy.max(0.2).min(0.9)
1710 }
1711
1712 pub fn accept_quic_discovered_address(
1715 &mut self,
1716 peer_id: PeerId,
1717 discovered_address: SocketAddr,
1718 ) -> Result<(), DiscoveryError> {
1719 let priority = self.calculate_quic_discovered_priority(&discovered_address);
1721
1722 let session = self.active_sessions.get_mut(&peer_id)
1724 .ok_or_else(|| DiscoveryError::InternalError(
1725 format!("No active discovery session for peer {:?}", peer_id)
1726 ))?;
1727
1728 let already_exists = session.discovered_candidates.iter()
1730 .any(|c| c.address == discovered_address);
1731
1732 if already_exists {
1733 debug!("QUIC-discovered address {} already in candidates", discovered_address);
1734 return Ok(());
1735 }
1736
1737 info!("Accepting QUIC-discovered address: {}", discovered_address);
1738
1739 let candidate = DiscoveryCandidate {
1741 address: discovered_address,
1742 priority,
1743 source: DiscoverySourceType::ServerReflexive,
1744 state: CandidateState::New,
1745 };
1746
1747 session.discovered_candidates.push(candidate);
1749 session.statistics.server_reflexive_candidates_found += 1;
1750
1751 Ok(())
1752 }
1753
1754 fn calculate_quic_discovered_priority(&self, address: &SocketAddr) -> u32 {
1756 let mut priority = 255; match address.ip() {
1761 IpAddr::V4(ipv4) => {
1762 if ipv4.is_private() {
1763 priority -= 10; } else if ipv4.is_loopback() {
1765 priority -= 20; }
1767 }
1769 IpAddr::V6(ipv6) => {
1770 priority += 10; if ipv6.is_loopback() {
1774 priority -= 30; } else if ipv6.is_multicast() {
1776 priority -= 40; } else if ipv6.is_unspecified() {
1778 priority -= 50; } else {
1780 let segments = ipv6.segments();
1782 if segments[0] & 0xFFC0 == 0xFE80 {
1783 priority -= 30; } else if segments[0] & 0xFE00 == 0xFC00 {
1786 priority -= 10; }
1789 }
1791 }
1792 }
1793
1794 priority
1795 }
1796
1797 pub fn poll_discovery_progress(&mut self, peer_id: PeerId) -> Vec<DiscoveryEvent> {
1799 let mut events = Vec::new();
1800
1801 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
1802 for candidate in &session.discovered_candidates {
1804 if matches!(candidate.state, CandidateState::New) {
1805 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1806 candidate: candidate.to_candidate_address(),
1807 bootstrap_node: "0.0.0.0:0".parse().unwrap(), });
1809 }
1810 }
1811
1812 for candidate in &mut session.discovered_candidates {
1814 if matches!(candidate.state, CandidateState::New) {
1815 candidate.state = CandidateState::Validating;
1816 }
1817 }
1818 }
1819
1820 events
1821 }
1822
1823 pub fn get_discovery_status(&self, peer_id: PeerId) -> Option<DiscoveryStatus> {
1825 self.active_sessions.get(&peer_id).map(|session| {
1826 let discovered_candidates = session.discovered_candidates.iter()
1827 .map(|c| c.to_candidate_address())
1828 .collect();
1829
1830 DiscoveryStatus {
1831 phase: session.current_phase.clone(),
1832 discovered_candidates,
1833 statistics: session.statistics.clone(),
1834 elapsed_time: session.started_at.elapsed(),
1835 }
1836 })
1837 }
1838}
1839
1840#[derive(Debug, Clone)]
1842pub struct DiscoveryStatus {
1843 pub phase: DiscoveryPhase,
1844 pub discovered_candidates: Vec<CandidateAddress>,
1845 pub statistics: DiscoveryStatistics,
1846 pub elapsed_time: Duration,
1847}
1848
1849#[derive(Debug, Clone)]
1851pub struct DiscoveryResults {
1852 pub candidates: Vec<ValidatedCandidate>,
1853 pub completion_time: Instant,
1854 pub statistics: DiscoveryStatistics,
1855}
1856
1857pub trait NetworkInterfaceDiscovery {
1861 fn start_scan(&mut self) -> Result<(), String>;
1862 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1863}
1864
1865#[derive(Debug, Clone, PartialEq)]
1867pub struct NetworkInterface {
1868 pub name: String,
1869 pub addresses: Vec<SocketAddr>,
1870 pub is_up: bool,
1871 pub is_wireless: bool,
1872 pub mtu: Option<u16>,
1873}
1874
1875#[derive(Debug)]
1877#[allow(dead_code)]
1878struct BootstrapConnection {
1879 connection: crate::Connection,
1881 address: SocketAddr,
1883 established_at: Instant,
1885 request_id: u64,
1887}
1888
1889#[derive(Debug, Clone)]
1891#[allow(dead_code)]
1892struct AddressObservationRequest {
1893 request_id: u64,
1895 timestamp: u64,
1897 capabilities: u32,
1899}
1900
1901#[derive(Debug)]
1903pub(crate) struct ServerReflexiveDiscovery {
1904 config: DiscoveryConfig,
1905 active_queries: HashMap<BootstrapNodeId, QueryState>,
1907 responses: VecDeque<ServerReflexiveResponse>,
1909 query_timeouts: HashMap<BootstrapNodeId, Instant>,
1911 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
1913 runtime_handle: Option<tokio::runtime::Handle>,
1915}
1916
1917#[allow(dead_code)]
1918impl ServerReflexiveDiscovery {
1919 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1920 Self {
1921 config: config.clone(),
1922 active_queries: HashMap::new(),
1923 responses: VecDeque::new(),
1924 query_timeouts: HashMap::new(),
1925 active_connections: HashMap::new(),
1926 runtime_handle: tokio::runtime::Handle::try_current().ok(),
1927 }
1928 }
1929
1930 pub(crate) fn start_queries(
1931 &mut self,
1932 bootstrap_nodes: &[BootstrapNodeId],
1933 now: Instant,
1934 ) -> HashMap<BootstrapNodeId, QueryState> {
1935 debug!(
1936 "Starting server reflexive queries to {} bootstrap nodes",
1937 bootstrap_nodes.len()
1938 );
1939
1940 self.active_queries.clear();
1941 self.query_timeouts.clear();
1942
1943 self.active_connections.clear();
1944
1945 for &node_id in bootstrap_nodes {
1946 let query_state = QueryState::Pending {
1947 sent_at: now,
1948 attempts: 1,
1949 };
1950
1951 self.active_queries.insert(node_id, query_state);
1952 self.query_timeouts
1953 .insert(node_id, now + self.config.bootstrap_query_timeout);
1954
1955 debug!(
1956 "Starting server reflexive query to bootstrap node {:?}",
1957 node_id
1958 );
1959
1960 if let Some(runtime) = &self.runtime_handle {
1962 self.start_quinn_query(node_id, runtime.clone(), now);
1963 } else {
1964 warn!(
1965 "No async runtime available, falling back to simulation for node {:?}",
1966 node_id
1967 );
1968 self.simulate_bootstrap_response(node_id, now);
1969 }
1970 }
1971
1972 self.active_queries.clone()
1973 }
1974
1975 pub(crate) fn start_queries_with_addresses(
1977 &mut self,
1978 bootstrap_nodes: &[(BootstrapNodeId, SocketAddr)],
1979 now: Instant,
1980 ) -> HashMap<BootstrapNodeId, QueryState> {
1981 debug!(
1982 "Starting server reflexive queries to {} bootstrap nodes with addresses",
1983 bootstrap_nodes.len()
1984 );
1985
1986 self.active_queries.clear();
1987 self.query_timeouts.clear();
1988
1989 self.active_connections.clear();
1990
1991 for &(node_id, bootstrap_address) in bootstrap_nodes {
1992 let query_state = QueryState::Pending {
1993 sent_at: now,
1994 attempts: 1,
1995 };
1996
1997 self.active_queries.insert(node_id, query_state);
1998 self.query_timeouts
1999 .insert(node_id, now + self.config.bootstrap_query_timeout);
2000
2001 debug!(
2002 "Starting server reflexive query to bootstrap node {:?} at {}",
2003 node_id, bootstrap_address
2004 );
2005
2006 if let Some(_runtime) = &self.runtime_handle {
2008 self.start_quinn_query_with_address(node_id, bootstrap_address, now);
2009 } else {
2010 warn!(
2011 "No async runtime available, falling back to simulation for node {:?}",
2012 node_id
2013 );
2014 self.simulate_bootstrap_response(node_id, now);
2015 }
2016 }
2017
2018 self.active_queries.clone()
2019 }
2020
2021 fn start_quinn_query(
2023 &mut self,
2024 node_id: BootstrapNodeId,
2025 _runtime: tokio::runtime::Handle,
2026 now: Instant,
2027 ) {
2028 let request_id = rand::random::<u64>();
2034
2035 debug!(
2036 "Starting Quinn connection to bootstrap node {:?} with request ID {}",
2037 node_id, request_id
2038 );
2039
2040 self.simulate_bootstrap_response(node_id, now);
2050 }
2051
2052 pub(crate) fn start_quinn_query_with_address(
2054 &mut self,
2055 node_id: BootstrapNodeId,
2056 bootstrap_address: SocketAddr,
2057 now: Instant,
2058 ) {
2059 let request_id = rand::random::<u64>();
2060
2061 info!(
2062 "Establishing Quinn connection to bootstrap node {:?} at {}",
2063 node_id, bootstrap_address
2064 );
2065
2066 if let Some(runtime) = &self.runtime_handle {
2068 let timeout = self.config.bootstrap_query_timeout;
2069
2070 let (response_tx, _response_rx) = tokio::sync::mpsc::unbounded_channel();
2072
2073 runtime.spawn(async move {
2078 match Self::perform_bootstrap_query(bootstrap_address, request_id, timeout).await {
2079 Ok(observed_address) => {
2080 let response = ServerReflexiveResponse {
2081 bootstrap_node: node_id,
2082 observed_address,
2083 response_time: now.elapsed(),
2084 timestamp: Instant::now(),
2085 };
2086
2087 let _ = response_tx.send(response);
2089
2090 info!(
2091 "Successfully received observed address {} from bootstrap node {:?}",
2092 observed_address, node_id
2093 );
2094 }
2095 Err(e) => {
2096 warn!(
2097 "Failed to query bootstrap node {:?} at {}: {}",
2098 node_id, bootstrap_address, e
2099 );
2100 }
2101 }
2102 });
2103 } else {
2104 warn!(
2105 "No async runtime available for Quinn query to {:?}",
2106 node_id
2107 );
2108 self.simulate_bootstrap_response(node_id, now);
2109 }
2110 }
2111
2112 async fn perform_bootstrap_query(
2117 _bootstrap_address: SocketAddr,
2118 _request_id: u64,
2119 _timeout: Duration,
2120 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2121 Err("Bootstrap query not implemented for low-level API".into())
2125
2126 }
2186
2187 fn create_discovery_request(request_id: u64) -> Vec<u8> {
2189 let mut request = Vec::new();
2190
2191 request.extend_from_slice(&request_id.to_be_bytes());
2196 request.extend_from_slice(
2197 &std::time::SystemTime::now()
2198 .duration_since(std::time::UNIX_EPOCH)
2199 .unwrap_or_default()
2200 .as_millis()
2201 .to_be_bytes()[8..16],
2202 ); request.extend_from_slice(&1u32.to_be_bytes()); debug!(
2206 "Created discovery request: {} bytes, request_id: {}",
2207 request.len(),
2208 request_id
2209 );
2210 request
2211 }
2212
2213 async fn wait_for_add_address_frame(
2215 _connection: &Connection,
2216 _expected_request_id: u64,
2217 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2218 Err("wait_for_add_address_frame not implemented for low-level API".into())
2221
2222 }
2258
2259 fn create_response_channel(
2261 &self,
2262 ) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
2263 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2266 tx
2268 }
2269
2270 pub(crate) fn poll_queries(
2271 &mut self,
2272 _active_queries: &HashMap<BootstrapNodeId, QueryState>,
2273 now: Instant,
2274 ) -> Vec<ServerReflexiveResponse> {
2275 let mut responses = Vec::new();
2276
2277 while let Some(response) = self.responses.pop_front() {
2279 responses.push(response);
2280 }
2281
2282 let mut timed_out_nodes = Vec::new();
2284 for (&node_id, &timeout) in &self.query_timeouts {
2285 if now >= timeout {
2286 timed_out_nodes.push(node_id);
2287 }
2288 }
2289
2290 for node_id in timed_out_nodes {
2292 self.query_timeouts.remove(&node_id);
2293
2294 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2295 match query_state {
2296 QueryState::Pending { attempts, .. }
2297 if *attempts < self.config.max_query_retries =>
2298 {
2299 *attempts += 1;
2301 let new_timeout = now + self.config.bootstrap_query_timeout;
2302 self.query_timeouts.insert(node_id, new_timeout);
2303
2304 debug!(
2305 "Retrying server reflexive query to bootstrap node {:?} (attempt {})",
2306 node_id, attempts
2307 );
2308
2309 self.simulate_bootstrap_response(node_id, now);
2311 }
2312 _ => {
2313 self.active_queries.insert(node_id, QueryState::Failed);
2315 warn!(
2316 "Server reflexive query to bootstrap node {:?} failed after retries",
2317 node_id
2318 );
2319 }
2320 }
2321 }
2322 }
2323
2324 responses
2325 }
2326
2327 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
2330 let simulated_external_addr = match node_id.0 % 3 {
2332 0 => "203.0.113.1:45678".parse().unwrap(),
2333 1 => "198.51.100.2:45679".parse().unwrap(),
2334 _ => "192.0.2.3:45680".parse().unwrap(),
2335 };
2336
2337 let response = ServerReflexiveResponse {
2338 bootstrap_node: node_id,
2339 observed_address: simulated_external_addr,
2340 response_time: Duration::from_millis(50 + node_id.0 * 10),
2341 timestamp: now,
2342 };
2343
2344 self.responses.push_back(response);
2345
2346 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2348 *query_state = QueryState::Completed;
2349 }
2350
2351 debug!(
2352 "Received simulated server reflexive response from bootstrap node {:?}: {}",
2353 node_id, simulated_external_addr
2354 );
2355 }
2356}
2357
2358#[derive(Debug)]
2360pub(crate) struct SymmetricNatPredictor {
2361 config: DiscoveryConfig,
2362}
2363
2364impl SymmetricNatPredictor {
2365 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2366 Self {
2367 config: config.clone(),
2368 }
2369 }
2370
2371 pub(crate) fn generate_predictions(
2376 &mut self,
2377 pattern_analysis: &PatternAnalysisState,
2378 max_count: usize,
2379 ) -> Vec<DiscoveryCandidate> {
2380 let mut predictions = Vec::new();
2381
2382 if pattern_analysis.allocation_history.is_empty() || max_count == 0 {
2383 return predictions;
2384 }
2385
2386 let recent_events: Vec<_> = pattern_analysis
2388 .allocation_history
2389 .iter()
2390 .rev()
2391 .take(5) .collect();
2393
2394 if recent_events.len() < 2 {
2395 return predictions;
2396 }
2397
2398 match &pattern_analysis.detected_pattern {
2399 Some(pattern) => {
2400 predictions.extend(self.generate_pattern_based_predictions(pattern, max_count));
2401 }
2402 None => {
2403 predictions.extend(self.generate_heuristic_predictions(&recent_events, max_count));
2404 }
2405 }
2406
2407 predictions.truncate(max_count);
2409 predictions
2410 }
2411
2412 fn generate_pattern_based_predictions(
2414 &self,
2415 pattern: &PortAllocationPattern,
2416 max_count: usize,
2417 ) -> Vec<DiscoveryCandidate> {
2418 let mut predictions = Vec::new();
2419
2420 match pattern.pattern_type {
2421 AllocationPatternType::Sequential => {
2422 for i in 1..=max_count as u16 {
2424 let predicted_port = pattern.base_port.wrapping_add(i);
2425 if self.is_valid_port(predicted_port) {
2426 predictions.push(
2427 self.create_predicted_candidate(predicted_port, pattern.confidence),
2428 );
2429 }
2430 }
2431 }
2432 AllocationPatternType::FixedStride => {
2433 for i in 1..=max_count as u16 {
2435 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
2436 if self.is_valid_port(predicted_port) {
2437 predictions.push(
2438 self.create_predicted_candidate(predicted_port, pattern.confidence),
2439 );
2440 }
2441 }
2442 }
2443 AllocationPatternType::PoolBased => {
2444 if let Some((min_port, max_port)) = pattern.pool_boundaries {
2446 let pool_size = max_port - min_port + 1;
2447 let step = (pool_size / max_count as u16).max(1);
2448
2449 for i in 0..max_count as u16 {
2450 let predicted_port = min_port + (i * step);
2451 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
2452 predictions.push(self.create_predicted_candidate(
2453 predicted_port,
2454 pattern.confidence * 0.8,
2455 ));
2456 }
2457 }
2458 }
2459 }
2460 AllocationPatternType::TimeBased => {
2461 for i in 1..=max_count as u16 {
2464 let predicted_port = pattern.base_port.wrapping_add(i);
2465 if self.is_valid_port(predicted_port) {
2466 predictions.push(
2467 self.create_predicted_candidate(
2468 predicted_port,
2469 pattern.confidence * 0.6,
2470 ),
2471 );
2472 }
2473 }
2474 }
2475 AllocationPatternType::Random | AllocationPatternType::Unknown => {
2476 predictions
2478 .extend(self.generate_statistical_predictions(pattern.base_port, max_count));
2479 }
2480 }
2481
2482 predictions
2483 }
2484
2485 fn generate_heuristic_predictions(
2487 &self,
2488 recent_events: &[&PortAllocationEvent],
2489 max_count: usize,
2490 ) -> Vec<DiscoveryCandidate> {
2491 let mut predictions = Vec::new();
2492
2493 if let Some(latest_event) = recent_events.first() {
2494 let base_port = latest_event.port;
2495
2496 for i in 1..=(max_count / 3) as u16 {
2500 let predicted_port = base_port.wrapping_add(i);
2501 if self.is_valid_port(predicted_port) {
2502 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
2503 }
2504 }
2505
2506 if base_port % 2 == 0 {
2508 let predicted_port = base_port + 1;
2509 if self.is_valid_port(predicted_port) {
2510 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
2511 }
2512 }
2513
2514 for stride in [2, 4, 8, 16] {
2516 if predictions.len() >= max_count {
2517 break;
2518 }
2519 let predicted_port = base_port.wrapping_add(stride);
2520 if self.is_valid_port(predicted_port) {
2521 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
2522 }
2523 }
2524
2525 if recent_events.len() >= 2 {
2527 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
2528 if stride > 0 && stride <= 100 {
2529 for i in 1..=3 {
2531 if predictions.len() >= max_count {
2532 break;
2533 }
2534 let predicted_port = base_port.wrapping_add(stride * i);
2535 if self.is_valid_port(predicted_port) {
2536 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
2537 }
2538 }
2539 }
2540 }
2541 }
2542
2543 predictions.truncate(max_count);
2544 predictions
2545 }
2546
2547 fn generate_statistical_predictions(
2549 &self,
2550 base_port: u16,
2551 max_count: usize,
2552 ) -> Vec<DiscoveryCandidate> {
2553 let mut predictions = Vec::new();
2554
2555 let common_ranges = [
2557 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
2562
2563 let current_range = common_ranges
2565 .iter()
2566 .find(|(min, max)| base_port >= *min && base_port <= *max)
2567 .copied()
2568 .unwrap_or((1024, 65535));
2569
2570 let range_size = current_range.1 - current_range.0;
2572 let step = (range_size / max_count as u16).max(1);
2573
2574 for i in 0..max_count {
2575 let offset = (i as u16 * step) % range_size;
2576 let predicted_port = current_range.0 + offset;
2577
2578 if self.is_valid_port(predicted_port) && predicted_port != base_port {
2579 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
2580 }
2581 }
2582
2583 predictions
2584 }
2585
2586 fn is_valid_port(&self, port: u16) -> bool {
2588 port >= 1024 && port <= 65535 && port != 0
2590 }
2591
2592 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2594 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
2598
2599 DiscoveryCandidate {
2600 address: SocketAddr::new(
2601 "0.0.0.0".parse().unwrap(), port,
2603 ),
2604 priority,
2605 source: DiscoverySourceType::Predicted,
2606 state: CandidateState::New,
2607 }
2608 }
2609
2610 pub(crate) fn analyze_allocation_patterns(
2612 &self,
2613 history: &VecDeque<PortAllocationEvent>,
2614 ) -> Option<PortAllocationPattern> {
2615 if history.len() < 3 {
2616 return None;
2617 }
2618
2619 let recent_ports: Vec<u16> = history
2620 .iter()
2621 .rev()
2622 .take(10)
2623 .map(|event| event.port)
2624 .collect();
2625
2626 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2628 return Some(pattern);
2629 }
2630
2631 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2633 return Some(pattern);
2634 }
2635
2636 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2638 return Some(pattern);
2639 }
2640
2641 if let Some(pattern) = self.detect_time_based_pattern(history) {
2643 return Some(pattern);
2644 }
2645
2646 None
2647 }
2648
2649 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2651 if ports.len() < 3 {
2652 return None;
2653 }
2654
2655 let mut sequential_count = 0;
2656 let mut total_comparisons = 0;
2657
2658 for i in 1..ports.len() {
2659 total_comparisons += 1;
2660 let diff = ports[i - 1].wrapping_sub(ports[i]);
2661 if diff == 1 {
2662 sequential_count += 1;
2663 }
2664 }
2665
2666 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2667
2668 if sequential_ratio >= 0.6 {
2669 let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2673 pattern_type: AllocationPatternType::Sequential,
2674 base_port: ports[0],
2675 stride: 1,
2676 pool_boundaries: None,
2677 confidence,
2678 })
2679 } else {
2680 None
2681 }
2682 }
2683
2684 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2686 if ports.len() < 4 {
2687 return None;
2688 }
2689
2690 let mut diffs = Vec::new();
2692 for i in 1..ports.len() {
2693 let diff = ports[i - 1].wrapping_sub(ports[i]);
2694 if diff > 0 && diff <= 1000 {
2695 diffs.push(diff);
2697 }
2698 }
2699
2700 if diffs.len() < 2 {
2701 return None;
2702 }
2703
2704 let mut diff_counts = std::collections::HashMap::new();
2706 for &diff in &diffs {
2707 *diff_counts.entry(diff).or_insert(0) += 1;
2708 }
2709
2710 let (most_common_diff, count) = diff_counts
2711 .iter()
2712 .max_by_key(|(_, &count)| count)
2713 .map(|(&diff, &count)| (diff, count))?;
2714
2715 let consistency_ratio = count as f64 / diffs.len() as f64;
2716
2717 if consistency_ratio >= 0.5 && most_common_diff > 1 {
2718 let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2722 pattern_type: AllocationPatternType::FixedStride,
2723 base_port: ports[0],
2724 stride: most_common_diff,
2725 pool_boundaries: None,
2726 confidence,
2727 })
2728 } else {
2729 None
2730 }
2731 }
2732
2733 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2735 if ports.len() < 5 {
2736 return None;
2737 }
2738
2739 let min_port = *ports.iter().min()?;
2740 let max_port = *ports.iter().max()?;
2741 let range = max_port - min_port;
2742
2743 if range > 0 && range <= 10000 {
2745 let expected_step = range / (ports.len() as u16 - 1);
2748 let mut uniform_score = 0.0;
2749
2750 let mut sorted_ports = ports.to_vec();
2751 sorted_ports.sort_unstable();
2752
2753 for i in 1..sorted_ports.len() {
2754 let actual_step = sorted_ports[i] - sorted_ports[i - 1];
2755 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2756 let normalized_diff = step_diff / expected_step as f64;
2757 uniform_score += 1.0 - normalized_diff.min(1.0);
2758 }
2759
2760 uniform_score /= (sorted_ports.len() - 1) as f64;
2761
2762 if uniform_score >= 0.4 {
2763 let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2767 pattern_type: AllocationPatternType::PoolBased,
2768 base_port: min_port,
2769 stride: expected_step,
2770 pool_boundaries: Some((min_port, max_port)),
2771 confidence,
2772 })
2773 } else {
2774 None
2775 }
2776 } else {
2777 None
2778 }
2779 }
2780
2781 fn detect_time_based_pattern(
2783 &self,
2784 history: &VecDeque<PortAllocationEvent>,
2785 ) -> Option<PortAllocationPattern> {
2786 if history.len() < 4 {
2787 return None;
2788 }
2789
2790 let mut time_intervals = Vec::new();
2792 let events: Vec<_> = history.iter().collect();
2793
2794 for i in 1..events.len() {
2795 let interval = events[i - 1].timestamp.duration_since(events[i].timestamp);
2796 time_intervals.push(interval);
2797 }
2798
2799 if time_intervals.is_empty() {
2800 return None;
2801 }
2802
2803 let avg_interval =
2805 time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2806
2807 let mut consistency_score = 0.0;
2808 for interval in &time_intervals {
2809 let diff = if *interval > avg_interval {
2810 *interval - avg_interval
2811 } else {
2812 avg_interval - *interval
2813 };
2814
2815 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2816 consistency_score += 1.0 - normalized_diff.min(1.0);
2817 }
2818
2819 consistency_score /= time_intervals.len() as f64;
2820
2821 if consistency_score >= 0.6
2822 && avg_interval.as_millis() > 100
2823 && avg_interval.as_millis() < 10000
2824 {
2825 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2828 pattern_type: AllocationPatternType::TimeBased,
2829 base_port: events[0].port,
2830 stride: 1, pool_boundaries: None,
2832 confidence,
2833 })
2834 } else {
2835 None
2836 }
2837 }
2838
2839 pub(crate) fn generate_confidence_scored_predictions(
2841 &mut self,
2842 base_address: SocketAddr,
2843 pattern_analysis: &PatternAnalysisState,
2844 max_count: usize,
2845 ) -> Vec<(DiscoveryCandidate, f64)> {
2846 let mut scored_predictions = Vec::new();
2847
2848 let predictions = self.generate_predictions(pattern_analysis, max_count);
2850
2851 for mut prediction in predictions {
2852 prediction.address = SocketAddr::new(base_address.ip(), prediction.address.port());
2854
2855 let confidence =
2857 self.calculate_prediction_confidence(&prediction, pattern_analysis, base_address);
2858
2859 scored_predictions.push((prediction, confidence));
2860 }
2861
2862 scored_predictions
2864 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2865
2866 scored_predictions
2867 }
2868
2869 fn calculate_prediction_confidence(
2871 &self,
2872 prediction: &DiscoveryCandidate,
2873 pattern_analysis: &PatternAnalysisState,
2874 base_address: SocketAddr,
2875 ) -> f64 {
2876 let mut confidence = 0.5; if let Some(ref pattern) = pattern_analysis.detected_pattern {
2880 confidence += pattern.confidence * 0.3;
2881 }
2882
2883 confidence += pattern_analysis.prediction_accuracy * 0.2;
2885
2886 let port_distance = (prediction.address.port() as i32 - base_address.port() as i32).abs();
2888 let proximity_score = if port_distance <= 10 {
2889 0.2
2890 } else if port_distance <= 100 {
2891 0.1
2892 } else {
2893 0.0
2894 };
2895 confidence += proximity_score;
2896
2897 let port_range_score = match prediction.address.port() {
2899 1024..=4999 => 0.1, 5000..=9999 => 0.15, 10000..=20000 => 0.1, 32768..=65535 => 0.05, _ => 0.0,
2904 };
2905 confidence += port_range_score;
2906
2907 confidence.max(0.0).min(1.0)
2909 }
2910
2911 pub(crate) fn update_pattern_analysis(
2913 &self,
2914 pattern_analysis: &mut PatternAnalysisState,
2915 new_event: PortAllocationEvent,
2916 ) {
2917 pattern_analysis.allocation_history.push_back(new_event);
2919
2920 if pattern_analysis.allocation_history.len() > 20 {
2922 pattern_analysis.allocation_history.pop_front();
2923 }
2924
2925 pattern_analysis.detected_pattern =
2927 self.analyze_allocation_patterns(&pattern_analysis.allocation_history);
2928
2929 if let Some(ref pattern) = pattern_analysis.detected_pattern {
2931 pattern_analysis.confidence_level = pattern.confidence;
2932 } else {
2933 pattern_analysis.confidence_level *= 0.9; }
2935
2936 pattern_analysis.prediction_accuracy *= 0.95;
2940 }
2941}
2942
2943#[derive(Debug)]
2945pub(crate) struct BootstrapNodeManager {
2946 config: DiscoveryConfig,
2947 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
2948 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
2949 performance_tracker: BootstrapPerformanceTracker,
2950 last_health_check: Option<Instant>,
2951 health_check_interval: Duration,
2952 failover_threshold: f64,
2953 discovery_sources: Vec<BootstrapDiscoverySource>,
2954}
2955
2956#[derive(Debug, Clone)]
2958pub(crate) struct BootstrapNodeInfo {
2959 pub address: SocketAddr,
2961 pub last_seen: Instant,
2963 pub can_coordinate: bool,
2965 pub health_status: BootstrapHealthStatus,
2967 pub capabilities: BootstrapCapabilities,
2969 pub priority: u32,
2971 pub discovery_source: BootstrapDiscoverySource,
2973}
2974
2975#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2977pub(crate) enum BootstrapHealthStatus {
2978 Healthy,
2980 Degraded,
2982 Unhealthy,
2984 Unknown,
2986}
2987
2988#[derive(Debug, Clone, Default)]
2990pub(crate) struct BootstrapCapabilities {
2991 pub supports_nat_traversal: bool,
2993 pub supports_ipv6: bool,
2995 pub supports_quic_extensions: bool,
2997 pub max_concurrent_coordinations: u32,
2999 pub supported_quic_versions: Vec<u32>,
3001}
3002
3003#[derive(Debug, Clone, Default)]
3005pub(crate) struct BootstrapHealthStats {
3006 pub connection_attempts: u32,
3008 pub successful_connections: u32,
3010 pub failed_connections: u32,
3012 pub average_rtt: Option<Duration>,
3014 pub recent_rtts: VecDeque<Duration>,
3016 pub last_health_check: Option<Instant>,
3018 pub consecutive_failures: u32,
3020 pub coordination_requests: u32,
3022 pub successful_coordinations: u32,
3024}
3025
3026#[derive(Debug, Default)]
3028pub(crate) struct BootstrapPerformanceTracker {
3029 pub overall_success_rate: f64,
3031 pub average_response_time: Duration,
3033 pub best_performers: Vec<BootstrapNodeId>,
3035 pub failover_nodes: Vec<BootstrapNodeId>,
3037 pub performance_history: VecDeque<PerformanceSnapshot>,
3039}
3040
3041#[derive(Debug, Clone)]
3043pub(crate) struct PerformanceSnapshot {
3044 pub timestamp: Instant,
3045 pub active_nodes: u32,
3046 pub success_rate: f64,
3047 pub average_rtt: Duration,
3048}
3049
3050#[derive(Debug, Clone, PartialEq, Eq)]
3052pub(crate) enum BootstrapDiscoverySource {
3053 Static,
3055 DNS,
3057 DHT,
3059 Multicast,
3061 UserProvided,
3063}
3064
3065impl BootstrapNodeManager {
3066 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3067 Self {
3068 config: config.clone(),
3069 bootstrap_nodes: HashMap::new(),
3070 health_stats: HashMap::new(),
3071 performance_tracker: BootstrapPerformanceTracker::default(),
3072 last_health_check: None,
3073 health_check_interval: Duration::from_secs(30),
3074 failover_threshold: 0.3, discovery_sources: vec![
3076 BootstrapDiscoverySource::Static,
3077 BootstrapDiscoverySource::DNS,
3078 BootstrapDiscoverySource::UserProvided,
3079 ],
3080 }
3081 }
3082
3083 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
3085 let now = Instant::now();
3086
3087 for (i, node) in nodes.into_iter().enumerate() {
3089 let node_id = BootstrapNodeId(i as u64);
3090
3091 let node_info = BootstrapNodeInfo {
3092 address: node.address,
3093 last_seen: node.last_seen,
3094 can_coordinate: node.can_coordinate,
3095 health_status: BootstrapHealthStatus::Unknown,
3096 capabilities: BootstrapCapabilities {
3097 supports_nat_traversal: node.can_coordinate,
3098 supports_ipv6: node.address.is_ipv6(),
3099 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
3103 priority: self.calculate_initial_priority(&node),
3104 discovery_source: BootstrapDiscoverySource::UserProvided,
3105 };
3106
3107 self.bootstrap_nodes.insert(node_id, node_info);
3108
3109 if !self.health_stats.contains_key(&node_id) {
3111 self.health_stats
3112 .insert(node_id, BootstrapHealthStats::default());
3113 }
3114 }
3115
3116 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
3117 self.schedule_health_check(now);
3118 }
3119
3120 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
3122 let mut active_nodes: Vec<_> = self
3123 .bootstrap_nodes
3124 .iter()
3125 .filter(|(_, node)| {
3126 matches!(
3127 node.health_status,
3128 BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown
3129 )
3130 })
3131 .map(|(&id, node)| (id, node))
3132 .collect();
3133
3134 active_nodes.sort_by(|a, b| {
3136 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
3138 if health_cmp != std::cmp::Ordering::Equal {
3139 return health_cmp;
3140 }
3141
3142 b.1.priority.cmp(&a.1.priority)
3144 });
3145
3146 active_nodes.into_iter().map(|(id, _)| id).collect()
3147 }
3148
3149 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
3151 self.bootstrap_nodes.get(&id).map(|node| node.address)
3152 }
3153
3154 pub(crate) fn perform_health_check(&mut self, now: Instant) {
3156 if let Some(last_check) = self.last_health_check {
3157 if now.duration_since(last_check) < self.health_check_interval {
3158 return; }
3160 }
3161
3162 debug!(
3163 "Performing health check on {} bootstrap nodes",
3164 self.bootstrap_nodes.len()
3165 );
3166
3167 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
3169
3170 for node_id in node_ids {
3171 self.check_node_health(node_id, now);
3172 }
3173
3174 self.update_performance_metrics(now);
3175 self.last_health_check = Some(now);
3176 }
3177
3178 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
3180 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
3182 if node_info_opt.is_none() {
3183 return; }
3185 let node_info_for_priority = node_info_opt.unwrap();
3186 let current_health_status = node_info_for_priority.health_status;
3187
3188 let (_success_rate, new_health_status, _average_rtt) = {
3190 let stats = self.health_stats.get_mut(&node_id).unwrap();
3191
3192 let success_rate = if stats.connection_attempts > 0 {
3194 stats.successful_connections as f64 / stats.connection_attempts as f64
3195 } else {
3196 1.0 };
3198
3199 if !stats.recent_rtts.is_empty() {
3201 let total_rtt: Duration = stats.recent_rtts.iter().sum();
3202 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
3203 }
3204
3205 let new_health_status = if stats.consecutive_failures >= 3 {
3207 BootstrapHealthStatus::Unhealthy
3208 } else if success_rate < self.failover_threshold {
3209 BootstrapHealthStatus::Degraded
3210 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
3211 BootstrapHealthStatus::Healthy
3212 } else {
3213 current_health_status };
3215
3216 stats.last_health_check = Some(now);
3217
3218 (success_rate, new_health_status, stats.average_rtt)
3219 };
3220
3221 let stats_snapshot = self.health_stats.get(&node_id).unwrap();
3223 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
3224
3225 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3227 if new_health_status != node_info.health_status {
3228 info!(
3229 "Bootstrap node {:?} health status changed: {:?} -> {:?}",
3230 node_id, node_info.health_status, new_health_status
3231 );
3232 node_info.health_status = new_health_status;
3233 }
3234
3235 node_info.priority = new_priority;
3236 }
3237 }
3238
3239 pub(crate) fn record_connection_attempt(
3241 &mut self,
3242 node_id: BootstrapNodeId,
3243 success: bool,
3244 rtt: Option<Duration>,
3245 ) {
3246 if let Some(stats) = self.health_stats.get_mut(&node_id) {
3247 stats.connection_attempts += 1;
3248
3249 if success {
3250 stats.successful_connections += 1;
3251 stats.consecutive_failures = 0;
3252
3253 if let Some(rtt) = rtt {
3254 stats.recent_rtts.push_back(rtt);
3255 if stats.recent_rtts.len() > 10 {
3256 stats.recent_rtts.pop_front();
3257 }
3258 }
3259 } else {
3260 stats.failed_connections += 1;
3261 stats.consecutive_failures += 1;
3262 }
3263 }
3264
3265 if success {
3267 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3268 node_info.last_seen = Instant::now();
3269 }
3270 }
3271 }
3272
3273 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
3275 if let Some(stats) = self.health_stats.get_mut(&node_id) {
3276 stats.coordination_requests += 1;
3277 if success {
3278 stats.successful_coordinations += 1;
3279 }
3280 }
3281 }
3282
3283 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
3285 let mut nodes_with_scores: Vec<_> = self
3286 .bootstrap_nodes
3287 .iter()
3288 .filter_map(|(&id, node)| {
3289 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
3290 let score = self.calculate_performance_score(id, node);
3291 Some((id, score))
3292 } else {
3293 None
3294 }
3295 })
3296 .collect();
3297
3298 nodes_with_scores
3299 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3300
3301 nodes_with_scores
3302 .into_iter()
3303 .take(count)
3304 .map(|(id, _)| id)
3305 .collect()
3306 }
3307
3308 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
3310 let mut discovered_nodes = Vec::new();
3311
3312 if let Ok(dns_nodes) = self.discover_via_dns() {
3314 discovered_nodes.extend(dns_nodes);
3315 }
3316
3317 if let Ok(multicast_nodes) = self.discover_via_multicast() {
3319 discovered_nodes.extend(multicast_nodes);
3320 }
3321
3322 for node in &discovered_nodes {
3324 let node_id = BootstrapNodeId(rand::random());
3325 self.bootstrap_nodes.insert(node_id, node.clone());
3326 self.health_stats
3327 .insert(node_id, BootstrapHealthStats::default());
3328 }
3329
3330 if !discovered_nodes.is_empty() {
3331 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
3332 }
3333
3334 Ok(discovered_nodes)
3335 }
3336
3337 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3339 debug!("DNS-based bootstrap discovery not yet implemented");
3342 Ok(Vec::new())
3343 }
3344
3345 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3347 debug!("Multicast-based bootstrap discovery not yet implemented");
3350 Ok(Vec::new())
3351 }
3352
3353 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
3355 let mut priority = 100; if node.can_coordinate {
3358 priority += 50;
3359 }
3360
3361 if let Some(rtt) = node.rtt {
3362 if rtt < Duration::from_millis(50) {
3363 priority += 30;
3364 } else if rtt < Duration::from_millis(100) {
3365 priority += 20;
3366 } else if rtt < Duration::from_millis(200) {
3367 priority += 10;
3368 }
3369 }
3370
3371 if node.address.is_ipv6() {
3373 priority += 10;
3374 }
3375
3376 priority
3377 }
3378
3379 fn calculate_dynamic_priority(
3381 &self,
3382 node_info: &BootstrapNodeInfo,
3383 stats: &BootstrapHealthStats,
3384 ) -> u32 {
3385 let mut priority = node_info.priority;
3386
3387 let success_rate = if stats.connection_attempts > 0 {
3389 stats.successful_connections as f64 / stats.connection_attempts as f64
3390 } else {
3391 1.0
3392 };
3393
3394 priority = (priority as f64 * success_rate) as u32;
3395
3396 if let Some(avg_rtt) = stats.average_rtt {
3398 if avg_rtt < Duration::from_millis(50) {
3399 priority += 20;
3400 } else if avg_rtt > Duration::from_millis(500) {
3401 priority = priority.saturating_sub(20);
3402 }
3403 }
3404
3405 priority = priority.saturating_sub(stats.consecutive_failures * 10);
3407
3408 priority.max(1) }
3410
3411 fn calculate_performance_score(
3413 &self,
3414 node_id: BootstrapNodeId,
3415 _node_info: &BootstrapNodeInfo,
3416 ) -> f64 {
3417 let stats = self.health_stats.get(&node_id).unwrap();
3418
3419 let mut score = 0.0;
3420
3421 let success_rate = if stats.connection_attempts > 0 {
3423 stats.successful_connections as f64 / stats.connection_attempts as f64
3424 } else {
3425 1.0
3426 };
3427 score += success_rate * 0.4;
3428
3429 if let Some(avg_rtt) = stats.average_rtt {
3431 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
3432 score += rtt_score * 0.3;
3433 } else {
3434 score += 0.3; }
3436
3437 let coord_success_rate = if stats.coordination_requests > 0 {
3439 stats.successful_coordinations as f64 / stats.coordination_requests as f64
3440 } else {
3441 1.0
3442 };
3443 score += coord_success_rate * 0.2;
3444
3445 let stability_score = if stats.consecutive_failures == 0 {
3447 1.0
3448 } else {
3449 1.0 / (stats.consecutive_failures as f64 + 1.0)
3450 };
3451 score += stability_score * 0.1;
3452
3453 score
3454 }
3455
3456 fn compare_health_status(
3458 &self,
3459 a: BootstrapHealthStatus,
3460 b: BootstrapHealthStatus,
3461 ) -> std::cmp::Ordering {
3462 use std::cmp::Ordering;
3463
3464 match (a, b) {
3465 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
3466 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
3468 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
3469 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
3471 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
3472 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
3474 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
3475 }
3476 }
3477
3478 fn update_performance_metrics(&mut self, now: Instant) {
3480 let mut total_attempts = 0;
3481 let mut total_successes = 0;
3482 let mut total_rtt = Duration::ZERO;
3483 let mut rtt_count = 0;
3484
3485 for stats in self.health_stats.values() {
3486 total_attempts += stats.connection_attempts;
3487 total_successes += stats.successful_connections;
3488
3489 if let Some(avg_rtt) = stats.average_rtt {
3490 total_rtt += avg_rtt;
3491 rtt_count += 1;
3492 }
3493 }
3494
3495 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
3496 total_successes as f64 / total_attempts as f64
3497 } else {
3498 1.0
3499 };
3500
3501 self.performance_tracker.average_response_time = if rtt_count > 0 {
3502 total_rtt / rtt_count
3503 } else {
3504 Duration::from_millis(100) };
3506
3507 self.performance_tracker.best_performers = self.get_best_performers(5);
3509
3510 let snapshot = PerformanceSnapshot {
3512 timestamp: now,
3513 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
3514 success_rate: self.performance_tracker.overall_success_rate,
3515 average_rtt: self.performance_tracker.average_response_time,
3516 };
3517
3518 self.performance_tracker
3519 .performance_history
3520 .push_back(snapshot);
3521 if self.performance_tracker.performance_history.len() > 100 {
3522 self.performance_tracker.performance_history.pop_front();
3523 }
3524 }
3525
3526 fn schedule_health_check(&mut self, _now: Instant) {
3528 }
3531
3532 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
3534 &self.performance_tracker
3535 }
3536
3537 pub(crate) fn get_node_health_stats(
3539 &self,
3540 node_id: BootstrapNodeId,
3541 ) -> Option<&BootstrapHealthStats> {
3542 self.health_stats.get(&node_id)
3543 }
3544}
3545
3546#[derive(Debug)]
3548pub(crate) struct DiscoveryCache {
3549 config: DiscoveryConfig,
3550}
3551
3552impl DiscoveryCache {
3553 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3554 Self {
3555 config: config.clone(),
3556 }
3557 }
3558}
3559
3560pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
3562 #[cfg(target_os = "windows")]
3563 return Box::new(WindowsInterfaceDiscovery::new());
3564
3565 #[cfg(target_os = "linux")]
3566 return Box::new(LinuxInterfaceDiscovery::new());
3567
3568 #[cfg(target_os = "macos")]
3569 return Box::new(MacOSInterfaceDiscovery::new());
3570
3571 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
3572 return Box::new(GenericInterfaceDiscovery::new());
3573}
3574
3575pub(crate) struct GenericInterfaceDiscovery {
3585 scan_complete: bool,
3586}
3587
3588impl GenericInterfaceDiscovery {
3589 pub(crate) fn new() -> Self {
3590 Self {
3591 scan_complete: false,
3592 }
3593 }
3594}
3595
3596impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
3597 fn start_scan(&mut self) -> Result<(), String> {
3598 self.scan_complete = true;
3600 Ok(())
3601 }
3602
3603 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
3604 if self.scan_complete {
3605 self.scan_complete = false;
3606 Some(vec![NetworkInterface {
3607 name: "generic".to_string(),
3608 addresses: vec!["127.0.0.1:0".parse().unwrap()],
3609 is_up: true,
3610 is_wireless: false,
3611 mtu: Some(1500),
3612 }])
3613 } else {
3614 None
3615 }
3616 }
3617}
3618
3619impl std::fmt::Display for DiscoveryError {
3620 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3621 match self {
3622 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
3623 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
3624 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
3625 Self::InsufficientCandidates { found, required } => {
3626 write!(f, "insufficient candidates found: {} < {}", found, required)
3627 }
3628 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
3629 Self::ConfigurationError(msg) => write!(f, "configuration error: {}", msg),
3630 Self::InternalError(msg) => write!(f, "internal error: {}", msg),
3631 }
3632 }
3633}
3634
3635impl std::error::Error for DiscoveryError {}
3636
3637pub mod test_utils {
3639 use super::*;
3640
3641 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
3643 let mut priority = 100; match address {
3645 IpAddr::V4(ipv4) => {
3646 if ipv4.is_private() {
3647 priority += 50; }
3649 }
3650 IpAddr::V6(ipv6) => {
3651 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3654 let segments = ipv6.segments();
3655 if segments[0] & 0xE000 == 0x2000 {
3656 priority += 60;
3658 } else if segments[0] & 0xFFC0 == 0xFE80 {
3659 priority += 20;
3661 } else if segments[0] & 0xFE00 == 0xFC00 {
3662 priority += 40;
3664 } else {
3665 priority += 30;
3667 }
3668 }
3669
3670 priority += 10; }
3673 }
3674 priority
3675 }
3676
3677 pub fn is_valid_address(address: &IpAddr) -> bool {
3679 match address {
3680 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3681 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3682 }
3683 }
3684}
3685
3686#[cfg(test)]
3687mod tests {
3688 use super::*;
3689 use std::collections::HashSet;
3690
3691 fn create_test_manager() -> CandidateDiscoveryManager {
3692 let config = DiscoveryConfig {
3693 total_timeout: Duration::from_secs(30),
3694 local_scan_timeout: Duration::from_secs(5),
3695 bootstrap_query_timeout: Duration::from_secs(10),
3696 max_query_retries: 3,
3697 max_candidates: 50,
3698 enable_symmetric_prediction: true,
3699 min_bootstrap_consensus: 2,
3700 interface_cache_ttl: Duration::from_secs(300),
3701 server_reflexive_cache_ttl: Duration::from_secs(600),
3702 bound_address: None,
3703 };
3704 CandidateDiscoveryManager::new(config)
3705 }
3706
3707 #[test]
3708 fn test_accept_quic_discovered_addresses() {
3709 let mut manager = create_test_manager();
3710 let peer_id = PeerId([1; 32]);
3711
3712 manager.start_discovery(peer_id, vec![]).unwrap();
3714
3715 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3717 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3718
3719 assert!(result.is_ok());
3720
3721 if let Some(session) = manager.active_sessions.get(&peer_id) {
3723 let found = session.discovered_candidates.iter()
3724 .any(|c| c.address == discovered_addr && matches!(c.source, DiscoverySourceType::ServerReflexive));
3725 assert!(found, "QUIC-discovered address should be in candidates");
3726 }
3727 }
3728
3729 #[test]
3730 fn test_accept_quic_discovered_addresses_no_session() {
3731 let mut manager = create_test_manager();
3732 let peer_id = PeerId([1; 32]);
3733 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3734
3735 let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3737
3738 assert!(result.is_err());
3739 match result {
3740 Err(DiscoveryError::InternalError(msg)) => {
3741 assert!(msg.contains("No active discovery session"));
3742 }
3743 _ => panic!("Expected InternalError for missing session"),
3744 }
3745 }
3746
3747 #[test]
3748 fn test_accept_quic_discovered_addresses_deduplication() {
3749 let mut manager = create_test_manager();
3750 let peer_id = PeerId([1; 32]);
3751
3752 manager.start_discovery(peer_id, vec![]).unwrap();
3754
3755 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3757 let result1 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3758 let result2 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3759
3760 assert!(result1.is_ok());
3761 assert!(result2.is_ok()); if let Some(session) = manager.active_sessions.get(&peer_id) {
3765 let count = session.discovered_candidates.iter()
3766 .filter(|c| c.address == discovered_addr)
3767 .count();
3768 assert_eq!(count, 1, "Should not have duplicate addresses");
3769 }
3770 }
3771
3772 #[test]
3773 fn test_accept_quic_discovered_addresses_priority() {
3774 let mut manager = create_test_manager();
3775 let peer_id = PeerId([1; 32]);
3776
3777 manager.start_discovery(peer_id, vec![]).unwrap();
3779
3780 let public_addr = "8.8.8.8:5000".parse().unwrap();
3782 let private_addr = "192.168.1.100:5000".parse().unwrap();
3783 let ipv6_addr = "[2001:db8::1]:5000".parse().unwrap();
3784
3785 manager.accept_quic_discovered_address(peer_id, public_addr).unwrap();
3786 manager.accept_quic_discovered_address(peer_id, private_addr).unwrap();
3787 manager.accept_quic_discovered_address(peer_id, ipv6_addr).unwrap();
3788
3789 if let Some(session) = manager.active_sessions.get(&peer_id) {
3791 for candidate in &session.discovered_candidates {
3792 assert!(candidate.priority > 0, "All candidates should have non-zero priority");
3793
3794 if candidate.address == ipv6_addr {
3796 let ipv4_priority = session.discovered_candidates.iter()
3797 .find(|c| c.address == public_addr)
3798 .map(|c| c.priority)
3799 .unwrap();
3800
3801 assert!(candidate.priority >= ipv4_priority);
3803 }
3804 }
3805 }
3806 }
3807
3808 #[test]
3809 fn test_accept_quic_discovered_addresses_event_generation() {
3810 let mut manager = create_test_manager();
3811 let peer_id = PeerId([1; 32]);
3812
3813 manager.start_discovery(peer_id, vec![]).unwrap();
3815
3816 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3818 manager.accept_quic_discovered_address(peer_id, discovered_addr).unwrap();
3819
3820 let events = manager.poll_discovery_progress(peer_id);
3822
3823 let has_event = events.iter().any(|e| matches!(e,
3825 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3826 if candidate.address == discovered_addr
3827 ));
3828
3829 assert!(has_event, "Should generate discovery event for QUIC-discovered address");
3830 }
3831
3832 #[test]
3833 fn test_discovery_completes_without_server_reflexive_phase() {
3834 let mut manager = create_test_manager();
3835 let peer_id = PeerId([1; 32]);
3836
3837 manager.start_discovery(peer_id, vec![]).unwrap();
3839
3840 let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3842 manager.accept_quic_discovered_address(peer_id, discovered_addr).unwrap();
3843
3844 let status = manager.get_discovery_status(peer_id).unwrap();
3846
3847 match &status.phase {
3849 DiscoveryPhase::ServerReflexiveQuerying { .. } => {
3850 panic!("Should not be in ServerReflexiveQuerying phase when using QUIC discovery");
3851 }
3852 _ => {} }
3854 }
3855
3856 #[test]
3857 fn test_no_bootstrap_queries_when_using_quic_discovery() {
3858 let mut manager = create_test_manager();
3859 let peer_id = PeerId([1; 32]);
3860
3861 manager.start_discovery(peer_id, vec![]).unwrap();
3863
3864 let addr1 = "192.168.1.100:5000".parse().unwrap();
3866 let addr2 = "8.8.8.8:5000".parse().unwrap();
3867 manager.accept_quic_discovered_address(peer_id, addr1).unwrap();
3868 manager.accept_quic_discovered_address(peer_id, addr2).unwrap();
3869
3870 let status = manager.get_discovery_status(peer_id).unwrap();
3872
3873 assert!(status.discovered_candidates.len() >= 2);
3875
3876 if let Some(session) = manager.active_sessions.get(&peer_id) {
3878 assert_eq!(session.statistics.bootstrap_queries_sent, 0,
3880 "Should not query bootstrap nodes when using QUIC discovery");
3881 }
3882 }
3883
3884 #[test]
3885 fn test_priority_differences_quic_vs_placeholder() {
3886 let mut manager = create_test_manager();
3887 let peer_id = PeerId([1; 32]);
3888
3889 manager.start_discovery(peer_id, vec![]).unwrap();
3891
3892 let discovered_addr = "8.8.8.8:5000".parse().unwrap();
3894 manager.accept_quic_discovered_address(peer_id, discovered_addr).unwrap();
3895
3896 if let Some(session) = manager.active_sessions.get(&peer_id) {
3898 let candidate = session.discovered_candidates.iter()
3899 .find(|c| c.address == discovered_addr)
3900 .expect("Should find the discovered address");
3901
3902 assert!(candidate.priority > 100, "QUIC-discovered address should have good priority");
3904 assert!(candidate.priority < 300, "Priority should be reasonable");
3905
3906 assert!(matches!(candidate.source, DiscoverySourceType::ServerReflexive));
3908 }
3909 }
3910
3911 #[test]
3912 fn test_quic_discovered_address_priority_calculation() {
3913 let mut manager = create_test_manager();
3915 let peer_id = PeerId([1; 32]);
3916
3917 manager.start_discovery(peer_id, vec![]).unwrap();
3919
3920 let test_cases = vec![
3922 ("1.2.3.4:5678", (250, 260), "Public IPv4"),
3924 ("192.168.1.100:9000", (240, 250), "Private IPv4"),
3925 ("[2001:db8::1]:5678", (260, 280), "Global IPv6"),
3926 ("[fe80::1]:5678", (220, 240), "Link-local IPv6"),
3927 ("[fc00::1]:5678", (240, 260), "Unique local IPv6"),
3928 ("10.0.0.1:9000", (240, 250), "Private IPv4 (10.x)"),
3929 ("172.16.0.1:9000", (240, 250), "Private IPv4 (172.16.x)"),
3930 ];
3931
3932 for (addr_str, (min_priority, max_priority), description) in test_cases {
3933 let addr: SocketAddr = addr_str.parse().unwrap();
3934 manager.accept_quic_discovered_address(peer_id, addr).unwrap();
3935
3936 let session = manager.active_sessions.get(&peer_id).unwrap();
3937 let candidate = session.discovered_candidates.iter()
3938 .find(|c| c.address == addr)
3939 .unwrap_or_else(|| panic!("No candidate found for {}", description));
3940
3941 assert!(
3942 candidate.priority >= min_priority && candidate.priority <= max_priority,
3943 "{} priority {} not in range [{}, {}]",
3944 description, candidate.priority, min_priority, max_priority
3945 );
3946 }
3947 }
3948
3949 #[test]
3950 fn test_quic_discovered_priority_factors() {
3951 let manager = create_test_manager();
3953
3954 let base_priority = manager.calculate_quic_discovered_priority(
3956 &"1.2.3.4:5678".parse().unwrap()
3957 );
3958 assert_eq!(base_priority, 255, "Base priority should be 255 for public IPv4");
3959
3960 let ipv6_priority = manager.calculate_quic_discovered_priority(
3962 &"[2001:db8::1]:5678".parse().unwrap()
3963 );
3964 assert!(ipv6_priority > base_priority, "IPv6 should have higher priority than IPv4");
3965
3966 let private_priority = manager.calculate_quic_discovered_priority(
3968 &"192.168.1.1:5678".parse().unwrap()
3969 );
3970 assert!(private_priority < base_priority, "Private addresses should have lower priority");
3971
3972 let link_local_priority = manager.calculate_quic_discovered_priority(
3974 &"[fe80::1]:5678".parse().unwrap()
3975 );
3976 assert!(link_local_priority < private_priority, "Link-local should have lower priority than private");
3977 }
3978
3979 #[test]
3980 fn test_quic_discovered_addresses_override_stale_server_reflexive() {
3981 let mut manager = create_test_manager();
3983 let peer_id = PeerId([1; 32]);
3984
3985 manager.start_discovery(peer_id, vec![]).unwrap();
3987
3988 let session = manager.active_sessions.get_mut(&peer_id).unwrap();
3990 let old_candidate = DiscoveryCandidate {
3991 address: "1.2.3.4:1234".parse().unwrap(),
3992 priority: 200,
3993 source: DiscoverySourceType::ServerReflexive,
3994 state: CandidateState::Validating,
3995 };
3996 session.discovered_candidates.push(old_candidate);
3997
3998 let new_addr = "1.2.3.4:5678".parse().unwrap();
4000 manager.accept_quic_discovered_address(peer_id, new_addr).unwrap();
4001
4002 let session = manager.active_sessions.get(&peer_id).unwrap();
4004 let candidates: Vec<_> = session.discovered_candidates.iter()
4005 .filter(|c| c.source == DiscoverySourceType::ServerReflexive)
4006 .collect();
4007
4008 assert_eq!(candidates.len(), 2, "Should have both old and new candidates");
4009
4010 let new_candidate = candidates.iter()
4012 .find(|c| c.address == new_addr)
4013 .unwrap();
4014 assert_ne!(new_candidate.priority, 200, "New candidate should have recalculated priority");
4015 }
4016
4017 #[test]
4018 fn test_quic_discovered_address_generates_events() {
4019 let mut manager = create_test_manager();
4021 let peer_id = PeerId([1; 32]);
4022
4023 manager.start_discovery(peer_id, vec![]).unwrap();
4025
4026 manager.poll_discovery_progress(peer_id);
4028
4029 let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4031 manager.accept_quic_discovered_address(peer_id, discovered_addr).unwrap();
4032
4033 let events = manager.poll_discovery_progress(peer_id);
4035
4036 assert!(!events.is_empty(), "Should generate events for new QUIC-discovered address");
4038
4039 let has_new_candidate = events.iter().any(|e| matches!(e,
4041 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4042 if candidate.address == discovered_addr
4043 ));
4044 assert!(has_new_candidate, "Should generate ServerReflexiveCandidateDiscovered event for the discovered address");
4045 }
4046
4047 #[test]
4048 fn test_multiple_quic_discovered_addresses_generate_events() {
4049 let mut manager = create_test_manager();
4051 let peer_id = PeerId([1; 32]);
4052
4053 manager.start_discovery(peer_id, vec![]).unwrap();
4055
4056 manager.poll_discovery_progress(peer_id);
4058
4059 let addresses = vec![
4061 "8.8.8.8:5000".parse().unwrap(),
4062 "1.1.1.1:6000".parse().unwrap(),
4063 "[2001:db8::1]:7000".parse().unwrap(),
4064 ];
4065
4066 for addr in &addresses {
4067 manager.accept_quic_discovered_address(peer_id, *addr).unwrap();
4068 }
4069
4070 let events = manager.poll_discovery_progress(peer_id);
4072
4073 for addr in &addresses {
4075 let has_event = events.iter().any(|e| matches!(e,
4076 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4077 if candidate.address == *addr
4078 ));
4079 assert!(has_event, "Should have event for address {}", addr);
4080 }
4081 }
4082
4083 #[test]
4084 fn test_duplicate_quic_discovered_address_no_event() {
4085 let mut manager = create_test_manager();
4087 let peer_id = PeerId([1; 32]);
4088
4089 manager.start_discovery(peer_id, vec![]).unwrap();
4091
4092 let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4094 manager.accept_quic_discovered_address(peer_id, discovered_addr).unwrap();
4095
4096 manager.poll_discovery_progress(peer_id);
4098
4099 manager.accept_quic_discovered_address(peer_id, discovered_addr).unwrap();
4101
4102 let events = manager.poll_discovery_progress(peer_id);
4104
4105 let has_duplicate_event = events.iter().any(|e| matches!(e,
4107 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4108 if candidate.address == discovered_addr
4109 ));
4110
4111 assert!(!has_duplicate_event, "Should not generate event for duplicate address");
4112 }
4113
4114 #[test]
4115 fn test_quic_discovered_address_event_timing() {
4116 let mut manager = create_test_manager();
4118 let peer_id = PeerId([1; 32]);
4119
4120 manager.start_discovery(peer_id, vec![]).unwrap();
4122
4123 manager.poll_discovery_progress(peer_id);
4125
4126 let addr1 = "8.8.8.8:5000".parse().unwrap();
4128 let addr2 = "1.1.1.1:6000".parse().unwrap();
4129
4130 manager.accept_quic_discovered_address(peer_id, addr1).unwrap();
4131 manager.accept_quic_discovered_address(peer_id, addr2).unwrap();
4132
4133 let events = manager.poll_discovery_progress(peer_id);
4136
4137 let server_reflexive_count = events.iter()
4139 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4140 .count();
4141
4142 assert!(server_reflexive_count >= 2,
4143 "Should deliver all queued events on poll, got {} events", server_reflexive_count);
4144
4145 let events2 = manager.poll_discovery_progress(peer_id);
4147 let server_reflexive_count2 = events2.iter()
4148 .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4149 .count();
4150 assert_eq!(server_reflexive_count2, 0,
4151 "Server reflexive events should not be duplicated on subsequent polls");
4152 }
4153}