1use crate::dark_resolver::DarkResolver;
5use crate::shadow_address::{DefaultShadowAddressHandler, NetworkType, ShadowAddress};
6use crate::types::NetworkError;
7use libp2p::PeerId as LibP2PPeerId;
8use rand::{seq::SliceRandom, Rng};
9use serde::{Deserialize, Serialize};
10use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
11use std::net::SocketAddr;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use tokio::sync::{mpsc, RwLock, Semaphore};
15use tracing::{debug, info, warn};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum DiscoveryMethod {
20 Kademlia,
22 Static,
24 Mdns,
26 Bootstrap,
28 DarkAddress,
30 DNS,
32 Gossip,
34 Hybrid(Vec<DiscoveryMethod>),
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub enum NetworkConfig {
41 Public {
43 nat_traversal: bool,
45 upnp: bool,
47 stun_turn: bool,
49 },
50 Private {
52 allowed_ranges: Vec<String>,
54 require_auth: bool,
56 },
57 Hybrid {
59 public: Box<NetworkConfig>,
61 private: Box<NetworkConfig>,
63 fallback_public: bool,
65 },
66}
67
68#[derive(Debug, Clone)]
70pub struct DiscoveryConfig {
71 pub methods: Vec<DiscoveryMethod>,
73 pub bootstrap_nodes: Vec<SocketAddr>,
75 pub interval: u64,
77 pub max_peers: usize,
79 pub min_peers: usize,
81 pub reputation_threshold: f64,
83 pub network_config: NetworkConfig,
85 pub enable_dark_addressing: bool,
87 pub dark_resolver_config: DarkResolverConfig,
89 pub dht_config: DHTConfig,
91 pub max_concurrent_connections: usize,
93 pub scoring_config: PeerScoringConfig,
95 pub load_balancing_config: LoadBalancingConfig,
97 pub geo_preferences: GeoPreferences,
99}
100
101#[derive(Debug, Clone)]
103pub struct DHTConfig {
104 pub bucket_size: usize,
106 pub alpha: usize,
108 pub replication_factor: usize,
110 pub key_space_bits: usize,
112 pub bootstrap_timeout: Duration,
114 pub refresh_interval: Duration,
116 pub enable_republishing: bool,
118}
119
120#[derive(Debug, Clone)]
122pub struct DarkResolverConfig {
123 pub max_cache_size: usize,
125 pub cache_ttl: Duration,
127 pub enable_distributed: bool,
129 pub fallback_dns: Vec<String>,
131 pub max_resolution_attempts: usize,
133}
134
135#[derive(Debug, Clone)]
137pub struct PeerScoringConfig {
138 pub initial_score: f64,
140 pub max_score: f64,
142 pub min_score: f64,
144 pub score_decay_rate: f64,
146 pub connection_success_bonus: f64,
148 pub connection_failure_penalty: f64,
150 pub uptime_bonus: f64,
152 pub latency_penalty_factor: f64,
154 pub enable_geographic_scoring: bool,
156}
157
158#[derive(Debug, Clone)]
160pub struct LoadBalancingConfig {
161 pub algorithm: LoadBalancingAlgorithm,
163 pub health_check_interval: Duration,
165 pub max_load_per_peer: f64,
167 pub enable_adaptive: bool,
169 pub circuit_breaker: CircuitBreakerConfig,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
175pub enum LoadBalancingAlgorithm {
176 RoundRobin,
178 WeightedRoundRobin,
180 LeastConnections,
182 LeastResponseTime,
184 Random,
186 ConsistentHashing,
188 ResourceBased,
190}
191
192#[derive(Debug, Clone)]
194pub struct CircuitBreakerConfig {
195 pub failure_threshold: usize,
197 pub success_threshold: usize,
199 pub timeout: Duration,
201 pub half_open_delay: Duration,
203}
204
205#[derive(Debug, Clone)]
207pub struct GeoPreferences {
208 pub prefer_local: bool,
210 pub local_latency_threshold: Duration,
212 pub preferred_regions: Vec<String>,
214 pub avoided_regions: Vec<String>,
216 pub enable_geo_diversity: bool,
218}
219
220impl Default for DiscoveryConfig {
221 fn default() -> Self {
222 Self {
223 methods: vec![DiscoveryMethod::Kademlia, DiscoveryMethod::Mdns],
224 bootstrap_nodes: vec![],
225 interval: 30,
226 max_peers: 50,
227 min_peers: 8,
228 reputation_threshold: 0.0,
229 network_config: NetworkConfig::Public {
230 nat_traversal: true,
231 upnp: true,
232 stun_turn: true,
233 },
234 enable_dark_addressing: true,
235 dark_resolver_config: DarkResolverConfig::default(),
236 dht_config: DHTConfig::default(),
237 max_concurrent_connections: 100,
238 scoring_config: PeerScoringConfig::default(),
239 load_balancing_config: LoadBalancingConfig::default(),
240 geo_preferences: GeoPreferences::default(),
241 }
242 }
243}
244
245impl Default for DHTConfig {
246 fn default() -> Self {
247 Self {
248 bucket_size: 20,
249 alpha: 3,
250 replication_factor: 20,
251 key_space_bits: 256,
252 bootstrap_timeout: Duration::from_secs(30),
253 refresh_interval: Duration::from_secs(3600),
254 enable_republishing: true,
255 }
256 }
257}
258
259impl Default for DarkResolverConfig {
260 fn default() -> Self {
261 Self {
262 max_cache_size: 10000,
263 cache_ttl: Duration::from_secs(3600),
264 enable_distributed: true,
265 fallback_dns: vec!["8.8.8.8".to_string(), "1.1.1.1".to_string()],
266 max_resolution_attempts: 3,
267 }
268 }
269}
270
271impl Default for PeerScoringConfig {
272 fn default() -> Self {
273 Self {
274 initial_score: 50.0,
275 max_score: 100.0,
276 min_score: -50.0,
277 score_decay_rate: 0.1,
278 connection_success_bonus: 5.0,
279 connection_failure_penalty: 10.0,
280 uptime_bonus: 1.0,
281 latency_penalty_factor: 0.01,
282 enable_geographic_scoring: true,
283 }
284 }
285}
286
287impl Default for LoadBalancingConfig {
288 fn default() -> Self {
289 Self {
290 algorithm: LoadBalancingAlgorithm::WeightedRoundRobin,
291 health_check_interval: Duration::from_secs(30),
292 max_load_per_peer: 100.0,
293 enable_adaptive: true,
294 circuit_breaker: CircuitBreakerConfig::default(),
295 }
296 }
297}
298
299impl Default for CircuitBreakerConfig {
300 fn default() -> Self {
301 Self {
302 failure_threshold: 5,
303 success_threshold: 3,
304 timeout: Duration::from_secs(60),
305 half_open_delay: Duration::from_secs(30),
306 }
307 }
308}
309
310impl Default for GeoPreferences {
311 fn default() -> Self {
312 Self {
313 prefer_local: true,
314 local_latency_threshold: Duration::from_millis(100),
315 preferred_regions: vec![],
316 avoided_regions: vec![],
317 enable_geo_diversity: true,
318 }
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct DiscoveredPeer {
325 pub peer_id: LibP2PPeerId,
327 pub addresses: Vec<SocketAddr>,
329 pub dark_addresses: Vec<ShadowAddress>,
331 pub discovered_at: Instant,
333 pub discovery_method: DiscoveryMethod,
335 pub reputation: f64,
337 pub connection_attempts: u32,
339 pub successful_connections: u32,
341 pub last_connection_attempt: Option<Instant>,
343 pub last_successful_connection: Option<Instant>,
345 pub protocols: Vec<String>,
347 pub geographic_info: Option<GeographicInfo>,
349 pub performance_metrics: PeerPerformanceMetrics,
351 pub load_metrics: PeerLoadMetrics,
353 pub capabilities: PeerCapabilities,
355 pub connection_quality: ConnectionQuality,
357 pub is_blacklisted: bool,
359 pub blacklist_reason: Option<String>,
361 pub uptime_stats: UptimeStats,
363 pub circuit_breaker_state: CircuitBreakerState,
365}
366
367#[derive(Debug, Clone)]
369pub struct GeographicInfo {
370 pub country_code: String,
372 pub city: Option<String>,
374 pub latitude: Option<f64>,
376 pub longitude: Option<f64>,
378 pub estimated_distance_km: Option<f64>,
380 pub asn: Option<u32>,
382 pub isp: Option<String>,
384}
385
386#[derive(Debug, Clone, Default)]
388pub struct PeerPerformanceMetrics {
389 pub avg_response_time: Duration,
391 pub min_response_time: Duration,
393 pub max_response_time: Duration,
395 pub p95_response_time: Duration,
397 pub throughput_mps: f64,
399 pub bandwidth_bps: u64,
401 pub error_rate: f64,
403 pub jitter: Duration,
405 pub packet_loss_rate: f64,
407}
408
409#[derive(Debug, Clone, Default)]
411pub struct PeerLoadMetrics {
412 pub active_connections: usize,
414 pub load_score: f64,
416 pub cpu_utilization: Option<f64>,
418 pub memory_utilization: Option<f64>,
420 pub network_utilization: Option<f64>,
422 pub queue_depth: usize,
424 pub weight: f64,
426}
427
428#[derive(Debug, Clone, Default)]
430pub struct PeerCapabilities {
431 pub protocol_versions: Vec<String>,
433 pub max_concurrent_connections: Option<usize>,
435 pub supported_message_types: Vec<String>,
437 pub supports_dark_addressing: bool,
439 pub supports_onion_routing: bool,
441 pub participates_in_dht: bool,
443 pub can_relay: bool,
445 pub provides_storage: bool,
447 pub bandwidth_capacity: Option<u64>,
449}
450
451#[derive(Debug, Clone, Default)]
453pub struct ConnectionQuality {
454 pub overall_score: f64,
456 pub reliability_score: f64,
458 pub performance_score: f64,
460 pub availability_score: f64,
462 pub security_score: f64,
464 pub last_assessed: Option<Instant>,
466}
467
468#[derive(Debug, Clone, Default)]
470pub struct UptimeStats {
471 pub total_observed_time: Duration,
473 pub total_uptime: Duration,
475 pub uptime_percentage: f64,
477 pub disconnection_count: u32,
479 pub avg_session_duration: Duration,
481 pub longest_session_duration: Duration,
483}
484
485#[derive(Debug, Clone, PartialEq, Eq)]
487pub enum CircuitBreakerState {
488 Closed,
490 Open {
492 opened_at: Instant,
494 failure_count: usize,
496 },
497 HalfOpen {
499 test_requests: usize,
501 successful_tests: usize,
503 },
504}
505
506impl DiscoveredPeer {
507 pub fn new(peer_id: LibP2PPeerId, address: SocketAddr, method: DiscoveryMethod) -> Self {
509 Self {
510 peer_id,
511 addresses: vec![address],
512 dark_addresses: vec![],
513 discovered_at: Instant::now(),
514 discovery_method: method,
515 reputation: 50.0, connection_attempts: 0,
517 successful_connections: 0,
518 last_connection_attempt: None,
519 last_successful_connection: None,
520 protocols: vec![],
521 geographic_info: None,
522 performance_metrics: PeerPerformanceMetrics::default(),
523 load_metrics: PeerLoadMetrics::default(),
524 capabilities: PeerCapabilities::default(),
525 connection_quality: ConnectionQuality::default(),
526 is_blacklisted: false,
527 blacklist_reason: None,
528 uptime_stats: UptimeStats::default(),
529 circuit_breaker_state: CircuitBreakerState::Closed,
530 }
531 }
532
533 pub fn should_attempt_connection(&self) -> bool {
535 self.should_attempt_connection_with_config(&PeerScoringConfig::default())
536 }
537
538 pub fn should_attempt_connection_with_config(&self, config: &PeerScoringConfig) -> bool {
540 if self.is_blacklisted {
542 return false;
543 }
544
545 match &self.circuit_breaker_state {
547 CircuitBreakerState::Open { opened_at, .. } => {
548 if opened_at.elapsed() < Duration::from_secs(60) {
550 return false;
551 }
552 }
553 CircuitBreakerState::HalfOpen { test_requests, .. } => {
554 if *test_requests >= 3 {
556 return false;
557 }
558 }
559 CircuitBreakerState::Closed => {}
560 }
561
562 if self.reputation < config.min_score {
564 return false;
565 }
566
567 if self.connection_attempts > 3 {
569 if let Some(last_attempt) = self.last_connection_attempt {
570 let backoff_time =
571 Duration::from_secs((self.connection_attempts as u64).pow(2) * 30);
572 if last_attempt.elapsed() < backoff_time {
573 return false;
574 }
575 }
576 }
577
578 if self.connection_quality.overall_score < 0.3 {
580 return false;
581 }
582
583 true
584 }
585
586 pub fn record_connection_attempt(&mut self, success: bool, config: &PeerScoringConfig) {
588 self.connection_attempts += 1;
589 self.last_connection_attempt = Some(Instant::now());
590
591 if success {
592 self.successful_connections += 1;
593 self.last_successful_connection = Some(Instant::now());
594 self.reputation += config.connection_success_bonus;
595 self.reputation = self.reputation.min(config.max_score);
596
597 match &self.circuit_breaker_state {
599 CircuitBreakerState::HalfOpen {
600 successful_tests, ..
601 } => {
602 let new_successful = successful_tests + 1;
603 if new_successful >= 3 {
604 self.circuit_breaker_state = CircuitBreakerState::Closed;
605 } else {
606 self.circuit_breaker_state = CircuitBreakerState::HalfOpen {
607 test_requests: 0,
608 successful_tests: new_successful,
609 };
610 }
611 }
612 _ => {
613 self.circuit_breaker_state = CircuitBreakerState::Closed;
614 }
615 }
616
617 self.update_connection_quality(true);
619 } else {
620 self.reputation -= config.connection_failure_penalty;
621 self.reputation = self.reputation.max(config.min_score);
622
623 match &self.circuit_breaker_state {
625 CircuitBreakerState::Closed => {
626 if self.connection_attempts >= 5 {
627 self.circuit_breaker_state = CircuitBreakerState::Open {
628 opened_at: Instant::now(),
629 failure_count: self.connection_attempts as usize,
630 };
631 }
632 }
633 CircuitBreakerState::HalfOpen { .. } => {
634 self.circuit_breaker_state = CircuitBreakerState::Open {
635 opened_at: Instant::now(),
636 failure_count: self.connection_attempts as usize,
637 };
638 }
639 _ => {}
640 }
641
642 self.update_connection_quality(false);
644 }
645 }
646
647 fn update_connection_quality(&mut self, _success: bool) {
649 let success_rate = if self.connection_attempts > 0 {
650 self.successful_connections as f64 / self.connection_attempts as f64
651 } else {
652 0.0
653 };
654
655 self.connection_quality.reliability_score = success_rate;
656
657 let performance_factor = 1.0 - (self.performance_metrics.error_rate * 0.5);
659 let availability_factor = self.uptime_stats.uptime_percentage / 100.0;
660
661 self.connection_quality.overall_score = (self.connection_quality.reliability_score * 0.4
662 + performance_factor * 0.3
663 + availability_factor * 0.2
664 + self.connection_quality.security_score * 0.1)
665 .clamp(0.0, 1.0);
666
667 self.connection_quality.last_assessed = Some(Instant::now());
668 }
669
670 pub fn update_performance_metrics(&mut self, response_time: Duration, success: bool) {
672 if success {
673 if self.performance_metrics.min_response_time == Duration::ZERO {
675 self.performance_metrics.min_response_time = response_time;
676 self.performance_metrics.max_response_time = response_time;
677 self.performance_metrics.avg_response_time = response_time;
678 } else {
679 self.performance_metrics.min_response_time = self
680 .performance_metrics
681 .min_response_time
682 .min(response_time);
683 self.performance_metrics.max_response_time = self
684 .performance_metrics
685 .max_response_time
686 .max(response_time);
687
688 let alpha = 0.1;
690 let current_avg = self.performance_metrics.avg_response_time.as_secs_f64();
691 let new_avg = alpha * response_time.as_secs_f64() + (1.0 - alpha) * current_avg;
692 self.performance_metrics.avg_response_time = Duration::from_secs_f64(new_avg);
693 }
694 }
695
696 let total_requests = self.connection_attempts as f64;
698 let failed_requests = (self.connection_attempts - self.successful_connections) as f64;
699 self.performance_metrics.error_rate = if total_requests > 0.0 {
700 failed_requests / total_requests
701 } else {
702 0.0
703 };
704 }
705
706 pub fn update_load_metrics(&mut self, active_connections: usize, queue_depth: usize) {
708 self.load_metrics.active_connections = active_connections;
709 self.load_metrics.queue_depth = queue_depth;
710
711 let connection_factor = if let Some(max_conn) = self.capabilities.max_concurrent_connections
713 {
714 active_connections as f64 / max_conn as f64
715 } else {
716 active_connections as f64 / 100.0 };
718
719 let queue_factor = queue_depth as f64 / 50.0; self.load_metrics.load_score =
722 ((connection_factor + queue_factor) * 50.0).clamp(0.0, 100.0);
723
724 self.load_metrics.weight = (100.0 - self.load_metrics.load_score).max(1.0);
726 }
727
728 pub fn is_healthy(&self) -> bool {
730 !self.is_blacklisted
731 && self.circuit_breaker_state == CircuitBreakerState::Closed
732 && self.connection_quality.overall_score > 0.5
733 && self.load_metrics.load_score < 90.0
734 }
735
736 pub fn calculate_priority(&self, config: &PeerScoringConfig) -> f64 {
738 let mut priority = self.reputation;
739
740 priority += self.connection_quality.overall_score * 20.0;
742
743 priority += (100.0 - self.load_metrics.load_score) * 0.1;
745
746 if let Some(geo_info) = &self.geographic_info {
748 if config.enable_geographic_scoring {
749 if let Some(distance) = geo_info.estimated_distance_km {
750 let distance_bonus = (1000.0 - distance.min(1000.0)) / 100.0;
752 priority += distance_bonus;
753 }
754 }
755 }
756
757 priority += self.uptime_stats.uptime_percentage * 0.1;
759
760 priority.max(0.0)
761 }
762
763 pub fn add_dark_address(&mut self, address: ShadowAddress) {
765 if !self.dark_addresses.contains(&address) {
766 self.dark_addresses.push(address);
767 self.capabilities.supports_dark_addressing = true;
768 }
769 }
770
771 pub fn update_geographic_info(&mut self, geo_info: GeographicInfo) {
773 self.geographic_info = Some(geo_info);
774 }
775
776 pub fn blacklist(&mut self, reason: String) {
778 self.is_blacklisted = true;
779 self.blacklist_reason = Some(reason);
780 self.reputation = -50.0; }
782
783 pub fn unblacklist(&mut self) {
785 self.is_blacklisted = false;
786 self.blacklist_reason = None;
787 self.reputation = 0.0; }
789
790 pub fn decay_reputation(&mut self, config: &PeerScoringConfig, hours_elapsed: f64) {
792 let decay_amount = config.score_decay_rate * hours_elapsed;
793 self.reputation = (self.reputation - decay_amount).max(config.min_score);
794 }
795
796 pub fn update_uptime(&mut self, is_online: bool, duration: Duration) {
798 self.uptime_stats.total_observed_time += duration;
799
800 if is_online {
801 self.uptime_stats.total_uptime += duration;
802 } else {
803 self.uptime_stats.disconnection_count += 1;
804 }
805
806 if self.uptime_stats.total_observed_time > Duration::ZERO {
808 self.uptime_stats.uptime_percentage = (self.uptime_stats.total_uptime.as_secs_f64()
809 / self.uptime_stats.total_observed_time.as_secs_f64())
810 * 100.0;
811 }
812 }
813}
814
815#[allow(dead_code)]
817pub struct KademliaPeerDiscovery {
818 config: DiscoveryConfig,
820 discovered_peers: Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
822 static_peers: HashSet<SocketAddr>,
824 bootstrap_completed: bool,
826 discovery_active: bool,
828 event_tx: Option<mpsc::Sender<DiscoveryEvent>>,
830 bootstrap_tried: HashSet<SocketAddr>,
832 last_discovery: Option<Instant>,
834 dark_resolver: Arc<DarkResolver>,
836 shadow_handler: DefaultShadowAddressHandler,
838 dht_buckets: Arc<RwLock<BTreeMap<usize, Vec<LibP2PPeerId>>>>,
840 connection_semaphore: Arc<Semaphore>,
842 load_balancer: Arc<Mutex<LoadBalancer>>,
844 peer_selector: Arc<Mutex<PeerSelector>>,
846 topology_optimizer: Arc<Mutex<TopologyOptimizer>>,
848 health_checker: Arc<Mutex<HealthChecker>>,
850 performance_monitor: Arc<Mutex<PerformanceMonitor>>,
852 bootstrap_strategy: BootstrapStrategy,
854}
855
856#[derive(Debug, Clone)]
858pub enum BootstrapStrategy {
859 Conservative,
861 Aggressive,
863 Adaptive {
865 aggressiveness: f64,
867 last_adapted: Instant,
869 },
870 Custom {
872 max_concurrent: usize,
874 attempt_timeout: Duration,
876 retry_strategy: RetryStrategy,
878 },
879}
880
881#[derive(Debug, Clone)]
883pub enum RetryStrategy {
884 ExponentialBackoff {
886 initial_delay: Duration,
888 max_delay: Duration,
890 multiplier: f64,
892 },
893 FixedInterval(Duration),
895 LinearBackoff {
897 initial_delay: Duration,
899 increment: Duration,
901 },
902 None,
904}
905
906#[derive(Debug, Clone)]
908pub enum DiscoveryEvent {
909 PeerDiscovered(DiscoveredPeer),
911 ReputationUpdated {
913 peer_id: LibP2PPeerId,
914 old_reputation: f64,
915 new_reputation: f64,
916 reason: String,
917 },
918 BootstrapCompleted {
920 peers_discovered: usize,
922 duration: Duration,
924 success_rate: f64,
926 },
927 BootstrapFailed {
929 reason: String,
931 attempted_nodes: usize,
933 successful_connections: usize,
935 },
936 PeerConnected {
938 peer_id: LibP2PPeerId,
939 address: SocketAddr,
940 connection_time: Duration,
941 },
942 PeerDisconnected {
944 peer_id: LibP2PPeerId,
945 reason: String,
946 session_duration: Duration,
947 },
948 PeerBlacklisted {
950 peer_id: LibP2PPeerId,
951 reason: String,
952 reputation: f64,
953 },
954 DarkAddressDiscovered {
956 peer_id: LibP2PPeerId,
957 dark_address: ShadowAddress,
958 resolution_time: Duration,
959 },
960 TopologyUpdated {
962 largest_component_size: usize,
964 avg_clustering: f64,
966 diameter: Option<usize>,
968 },
969 LoadBalancingUpdated {
971 active_connections: usize,
973 load_entropy: f64,
975 overloaded_peers: usize,
977 },
978 GeographicDistributionUpdated {
980 countries: usize,
982 diversity_score: f64,
984 avg_distance_km: f64,
986 },
987 DHTBucketUpdated {
989 bucket_index: usize,
991 peer_count: usize,
993 health_score: f64,
995 },
996 DiscoveryError {
998 error: String,
1000 category: DiscoveryErrorCategory,
1002 retry_suggested: bool,
1004 },
1005}
1006
1007#[derive(Debug, Clone, PartialEq, Eq)]
1009pub enum DiscoveryErrorCategory {
1010 NetworkError,
1012 ConfigurationError,
1014 ResourceError,
1016 ProtocolError,
1018 SecurityError,
1020 TimeoutError,
1022 DarkAddressingError,
1024 DHTError,
1026}
1027
1028#[derive(Debug)]
1030pub struct LoadBalancer {
1031 pub algorithm: LoadBalancingAlgorithm,
1033 pub round_robin_index: usize,
1035 pub hash_ring: Vec<(u64, LibP2PPeerId)>,
1037 pub peer_weights: HashMap<LibP2PPeerId, f64>,
1039 pub connection_counts: HashMap<LibP2PPeerId, usize>,
1041 pub response_times: HashMap<LibP2PPeerId, VecDeque<Duration>>,
1043 pub health_status: HashMap<LibP2PPeerId, bool>,
1045}
1046
1047impl LoadBalancer {
1048 pub fn new(algorithm: LoadBalancingAlgorithm) -> Self {
1050 Self {
1051 algorithm,
1052 round_robin_index: 0,
1053 hash_ring: Vec::new(),
1054 peer_weights: HashMap::new(),
1055 connection_counts: HashMap::new(),
1056 response_times: HashMap::new(),
1057 health_status: HashMap::new(),
1058 }
1059 }
1060
1061 pub fn select_peer(&mut self, available_peers: &[LibP2PPeerId]) -> Option<LibP2PPeerId> {
1063 if available_peers.is_empty() {
1064 return None;
1065 }
1066
1067 match &self.algorithm {
1068 LoadBalancingAlgorithm::RoundRobin => {
1069 let peer = available_peers[self.round_robin_index % available_peers.len()];
1070 self.round_robin_index = (self.round_robin_index + 1) % available_peers.len();
1071 Some(peer)
1072 }
1073 LoadBalancingAlgorithm::WeightedRoundRobin => {
1074 let total_weight: f64 = available_peers
1076 .iter()
1077 .map(|p| self.peer_weights.get(p).unwrap_or(&1.0))
1078 .sum();
1079
1080 let mut rng = rand::thread_rng();
1081 let mut target = rng.gen::<f64>() * total_weight;
1082
1083 for peer in available_peers {
1084 let weight = self.peer_weights.get(peer).unwrap_or(&1.0);
1085 if target < *weight {
1086 return Some(*peer);
1087 }
1088 target -= weight;
1089 }
1090
1091 available_peers.last().copied()
1092 }
1093 LoadBalancingAlgorithm::LeastConnections => available_peers
1094 .iter()
1095 .min_by_key(|p| self.connection_counts.get(p).unwrap_or(&0))
1096 .copied(),
1097 LoadBalancingAlgorithm::LeastResponseTime => available_peers
1098 .iter()
1099 .min_by_key(|p| {
1100 self.response_times
1101 .get(p)
1102 .and_then(|times| times.back())
1103 .map(|d| d.as_millis())
1104 .unwrap_or(u128::MAX)
1105 })
1106 .copied(),
1107 LoadBalancingAlgorithm::Random => {
1108 let mut rng = rand::thread_rng();
1109 available_peers.choose(&mut rng).copied()
1110 }
1111 _ => available_peers.first().copied(),
1112 }
1113 }
1114
1115 pub fn update_metrics(
1117 &mut self,
1118 peer: LibP2PPeerId,
1119 connections: usize,
1120 response_time: Option<Duration>,
1121 ) {
1122 self.connection_counts.insert(peer, connections);
1123
1124 if let Some(rt) = response_time {
1125 self.response_times
1126 .entry(peer)
1127 .or_insert_with(|| VecDeque::with_capacity(10))
1128 .push_back(rt);
1129
1130 if let Some(times) = self.response_times.get_mut(&peer) {
1131 if times.len() > 10 {
1132 times.pop_front();
1133 }
1134 }
1135 }
1136 }
1137}
1138
1139#[derive(Debug)]
1141pub struct PeerSelector {
1142 pub geo_preferences: GeoPreferences,
1144 pub required_capabilities: Vec<String>,
1146 pub strategy: PeerSelectionStrategy,
1148 pub recent_selections: VecDeque<LibP2PPeerId>,
1150 pub selection_history: HashMap<LibP2PPeerId, usize>,
1152}
1153
1154impl PeerSelector {
1155 pub fn new(geo_preferences: GeoPreferences) -> Self {
1157 Self {
1158 geo_preferences,
1159 required_capabilities: vec![],
1160 strategy: PeerSelectionStrategy::BestFirst,
1161 recent_selections: VecDeque::with_capacity(100),
1162 selection_history: HashMap::new(),
1163 }
1164 }
1165
1166 pub fn select_peers(
1168 &mut self,
1169 candidates: &[DiscoveredPeer],
1170 count: usize,
1171 scoring_config: &PeerScoringConfig,
1172 ) -> Vec<LibP2PPeerId> {
1173 let mut selected = Vec::new();
1174
1175 let mut eligible: Vec<_> = candidates
1177 .iter()
1178 .filter(|p| p.is_healthy() && !p.is_blacklisted)
1179 .filter(|p| p.connection_quality.reliability_score >= 0.5)
1180 .filter(|p| self.meets_capability_requirements(p))
1181 .collect();
1182
1183 match &self.strategy {
1185 PeerSelectionStrategy::BestFirst => {
1186 eligible.sort_by(|a, b| {
1187 b.calculate_priority(scoring_config)
1188 .partial_cmp(&a.calculate_priority(scoring_config))
1189 .unwrap_or(std::cmp::Ordering::Equal)
1190 });
1191 }
1192 PeerSelectionStrategy::Diversity => {
1193 eligible.sort_by_key(|p| {
1195 self.recent_selections
1196 .iter()
1197 .position(|id| id == &p.peer_id)
1198 .unwrap_or(usize::MAX)
1199 });
1200 }
1201 PeerSelectionStrategy::Probabilistic => {
1202 let mut rng = rand::thread_rng();
1204 eligible.shuffle(&mut rng);
1205 }
1206 _ => {}
1207 }
1208
1209 for peer in eligible.into_iter().take(count) {
1211 selected.push(peer.peer_id);
1212 self.recent_selections.push_back(peer.peer_id);
1213 if self.recent_selections.len() > 100 {
1214 self.recent_selections.pop_front();
1215 }
1216 *self.selection_history.entry(peer.peer_id).or_insert(0) += 1;
1217 }
1218
1219 selected
1220 }
1221
1222 fn meets_capability_requirements(&self, peer: &DiscoveredPeer) -> bool {
1224 if self.required_capabilities.is_empty() {
1225 return true;
1226 }
1227
1228 for required in &self.required_capabilities {
1229 if !peer.capabilities.supported_message_types.contains(required)
1230 && !peer.capabilities.protocol_versions.contains(required)
1231 {
1232 return false;
1233 }
1234 }
1235
1236 true
1237 }
1238}
1239
1240#[derive(Debug, Clone)]
1242pub enum PeerSelectionStrategy {
1243 BestFirst,
1245 Probabilistic,
1247 Diversity,
1249 EpsilonGreedy { epsilon: f64 },
1251 MultiArmedBandit,
1253}
1254
1255#[derive(Debug)]
1257pub struct TopologyOptimizer {
1258 pub target_clustering: f64,
1260 pub target_path_length: f64,
1262 pub min_connectivity: usize,
1264 pub last_optimization: Instant,
1266 pub optimization_interval: Duration,
1268 pub metrics_history: VecDeque<TopologyMetrics>,
1270}
1271
1272impl TopologyOptimizer {
1273 pub fn new() -> Self {
1275 Self {
1276 target_clustering: 0.3,
1277 target_path_length: 4.0,
1278 min_connectivity: 3,
1279 last_optimization: Instant::now(),
1280 optimization_interval: Duration::from_secs(300),
1281 metrics_history: VecDeque::with_capacity(100),
1282 }
1283 }
1284}
1285
1286#[derive(Debug, Clone)]
1288#[allow(dead_code)]
1289pub struct TopologyMetrics {
1290 timestamp: Instant,
1292 clustering_coefficient: f64,
1294 avg_path_length: f64,
1296 diameter: Option<usize>,
1298 connected_components: usize,
1300 largest_component_size: usize,
1302 small_world_coefficient: f64,
1304}
1305
1306#[derive(Debug)]
1308pub struct HealthChecker {
1309 pub check_interval: Duration,
1311 pub check_timeout: Duration,
1313 pub last_checks: HashMap<LibP2PPeerId, Instant>,
1315 pub health_results: HashMap<LibP2PPeerId, HealthCheckResult>,
1317 pub config: HealthCheckConfig,
1319}
1320
1321impl HealthChecker {
1322 pub fn new(check_interval: Duration) -> Self {
1324 Self {
1325 check_interval,
1326 check_timeout: Duration::from_secs(5),
1327 last_checks: HashMap::new(),
1328 health_results: HashMap::new(),
1329 config: HealthCheckConfig::default(),
1330 }
1331 }
1332}
1333
1334impl Default for HealthCheckConfig {
1335 fn default() -> Self {
1336 Self {
1337 enable_ping: true,
1338 enable_capability_check: true,
1339 enable_performance_monitoring: true,
1340 unhealthy_multiplier: 2.0,
1341 failure_threshold: 3,
1342 }
1343 }
1344}
1345
1346#[derive(Debug, Clone)]
1348#[allow(dead_code)]
1349pub struct HealthCheckConfig {
1350 enable_ping: bool,
1352 enable_capability_check: bool,
1354 enable_performance_monitoring: bool,
1356 unhealthy_multiplier: f64,
1358 failure_threshold: usize,
1360}
1361
1362#[derive(Debug, Clone)]
1364#[allow(dead_code)]
1365pub struct HealthCheckResult {
1366 is_healthy: bool,
1368 response_time: Option<Duration>,
1370 last_successful: Option<Instant>,
1372 consecutive_failures: usize,
1374 check_details: HashMap<String, bool>,
1376 errors: Vec<String>,
1378}
1379
1380#[derive(Debug)]
1382pub struct PerformanceMonitor {
1383 pub collection_interval: Duration,
1385 pub last_collection: Instant,
1387 pub system_metrics: SystemPerformanceMetrics,
1389 pub peer_metrics: HashMap<LibP2PPeerId, PeerPerformanceMetrics>,
1391 pub alerts: VecDeque<PerformanceAlert>,
1393}
1394
1395impl PerformanceMonitor {
1396 pub fn new() -> Self {
1398 Self {
1399 collection_interval: Duration::from_secs(60),
1400 last_collection: Instant::now(),
1401 system_metrics: SystemPerformanceMetrics::default(),
1402 peer_metrics: HashMap::new(),
1403 alerts: VecDeque::with_capacity(100),
1404 }
1405 }
1406}
1407
1408#[derive(Debug, Clone, Default)]
1410#[allow(dead_code)]
1411pub struct SystemPerformanceMetrics {
1412 discovery_rate: f64,
1414 connection_success_rate: f64,
1416 avg_connection_time: Duration,
1418 memory_usage_bytes: usize,
1420 cpu_usage_percent: f64,
1422 network_utilization_bps: u64,
1424 dht_overhead_percent: f64,
1426}
1427
1428#[derive(Debug, Clone)]
1430#[allow(dead_code)]
1431pub struct PerformanceAlert {
1432 timestamp: Instant,
1434 severity: AlertSeverity,
1436 category: AlertCategory,
1438 message: String,
1440 peer_id: Option<LibP2PPeerId>,
1442 suggested_actions: Vec<String>,
1444}
1445
1446#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1448pub enum AlertSeverity {
1449 Info,
1451 Warning,
1453 Error,
1455 Critical,
1457}
1458
1459#[derive(Debug, Clone, PartialEq, Eq)]
1461pub enum AlertCategory {
1462 Performance,
1464 Connectivity,
1466 Security,
1468 Resource,
1470 Configuration,
1472}
1473
1474impl KademliaPeerDiscovery {
1475 pub fn new(config: DiscoveryConfig) -> Self {
1477 let max_connections = config.max_concurrent_connections;
1478
1479 Self {
1480 discovered_peers: Arc::new(RwLock::new(HashMap::new())),
1481 static_peers: HashSet::new(),
1482 bootstrap_completed: false,
1483 discovery_active: false,
1484 event_tx: None,
1485 bootstrap_tried: HashSet::new(),
1486 last_discovery: None,
1487 dark_resolver: Arc::new(DarkResolver::new()),
1488 shadow_handler: DefaultShadowAddressHandler::new(
1489 NetworkType::Mainnet,
1490 [0u8; 32], ),
1492 dht_buckets: Arc::new(RwLock::new(BTreeMap::new())),
1493 connection_semaphore: Arc::new(Semaphore::new(max_connections)),
1494 load_balancer: Arc::new(Mutex::new(LoadBalancer::new(
1495 config.load_balancing_config.algorithm.clone(),
1496 ))),
1497 peer_selector: Arc::new(Mutex::new(PeerSelector::new(
1498 config.geo_preferences.clone(),
1499 ))),
1500 topology_optimizer: Arc::new(Mutex::new(TopologyOptimizer::new())),
1501 health_checker: Arc::new(Mutex::new(HealthChecker::new(
1502 config.load_balancing_config.health_check_interval,
1503 ))),
1504 performance_monitor: Arc::new(Mutex::new(PerformanceMonitor::new())),
1505 bootstrap_strategy: BootstrapStrategy::Adaptive {
1506 aggressiveness: 0.5,
1507 last_adapted: Instant::now(),
1508 },
1509 config,
1510 }
1511 }
1512
1513 pub fn set_event_channel(&mut self, tx: mpsc::Sender<DiscoveryEvent>) {
1515 self.event_tx = Some(tx);
1516 }
1517
1518 pub async fn start(&mut self) -> Result<(), NetworkError> {
1520 if self.discovery_active {
1521 return Ok(());
1522 }
1523
1524 self.discovery_active = true;
1525 info!("Starting Kademlia peer discovery service");
1526
1527 self.bootstrap().await?;
1529
1530 self.start_periodic_discovery().await;
1532
1533 Ok(())
1534 }
1535
1536 pub async fn stop(&mut self) -> Result<(), NetworkError> {
1538 self.discovery_active = false;
1539 info!("Stopping Kademlia peer discovery service");
1540 Ok(())
1541 }
1542
1543 async fn bootstrap(&mut self) -> Result<(), NetworkError> {
1545 if self.bootstrap_completed {
1546 return Ok(());
1547 }
1548
1549 info!(
1550 "Starting DHT bootstrap with {} nodes",
1551 self.config.bootstrap_nodes.len()
1552 );
1553 let start_time = Instant::now();
1554 let mut discovered_peers = 0;
1555
1556 for bootstrap_addr in &self.config.bootstrap_nodes {
1557 if self.bootstrap_tried.contains(bootstrap_addr) {
1558 continue;
1559 }
1560
1561 self.bootstrap_tried.insert(*bootstrap_addr);
1562
1563 let peer_id = LibP2PPeerId::random(); let discovered_peer =
1566 DiscoveredPeer::new(peer_id, *bootstrap_addr, DiscoveryMethod::Bootstrap);
1567
1568 self.discovered_peers
1570 .write()
1571 .await
1572 .insert(peer_id, discovered_peer.clone());
1573 discovered_peers += 1;
1574
1575 if let Some(tx) = &self.event_tx {
1577 let _ = tx
1578 .send(DiscoveryEvent::PeerDiscovered(discovered_peer))
1579 .await;
1580 }
1581
1582 debug!("Added bootstrap peer: {} -> {:?}", bootstrap_addr, peer_id);
1583 }
1584
1585 self.bootstrap_completed = true;
1586
1587 if let Some(tx) = &self.event_tx {
1588 let _ = tx
1589 .send(DiscoveryEvent::BootstrapCompleted {
1590 peers_discovered: discovered_peers,
1591 duration: start_time.elapsed(),
1592 success_rate: discovered_peers as f64
1593 / self.config.bootstrap_nodes.len().max(1) as f64,
1594 })
1595 .await;
1596 }
1597
1598 info!("DHT bootstrap completed");
1599 Ok(())
1600 }
1601
1602 async fn start_periodic_discovery(&mut self) {
1604 let interval = Duration::from_secs(self.config.interval);
1605 let discovered_peers = Arc::clone(&self.discovered_peers);
1606 let event_tx = self.event_tx.clone();
1607 let methods = self.config.methods.clone();
1608 let max_peers = self.config.max_peers;
1609
1610 tokio::spawn(async move {
1611 let mut interval_timer = tokio::time::interval(interval);
1612
1613 loop {
1614 interval_timer.tick().await;
1615
1616 for method in &methods {
1618 match method {
1619 DiscoveryMethod::Kademlia => {
1620 Self::discover_kademlia_peers(&discovered_peers, &event_tx, max_peers)
1621 .await;
1622 }
1623 DiscoveryMethod::Mdns => {
1624 Self::discover_mdns_peers(&discovered_peers, &event_tx).await;
1625 }
1626 _ => {} }
1628 }
1629 }
1630 });
1631 }
1632
1633 async fn discover_kademlia_peers(
1635 discovered_peers: &Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
1636 event_tx: &Option<mpsc::Sender<DiscoveryEvent>>,
1637 max_peers: usize,
1638 ) {
1639 let current_count = discovered_peers.read().await.len();
1640 if current_count >= max_peers {
1641 return;
1642 }
1643
1644 let peers_to_discover = (max_peers - current_count).min(5);
1646
1647 for _ in 0..peers_to_discover {
1648 let peer_id = LibP2PPeerId::random();
1649 let address =
1650 SocketAddr::from(([192, 168, 1, 100], 8000 + rand::random::<u16>() % 1000));
1651
1652 let discovered_peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Kademlia);
1653
1654 discovered_peers
1656 .write()
1657 .await
1658 .insert(peer_id, discovered_peer.clone());
1659
1660 if let Some(tx) = event_tx {
1662 let _ = tx
1663 .send(DiscoveryEvent::PeerDiscovered(discovered_peer))
1664 .await;
1665 }
1666
1667 debug!("Discovered peer via Kademlia: {:?} at {}", peer_id, address);
1668 }
1669 }
1670
1671 async fn discover_mdns_peers(
1673 discovered_peers: &Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
1674 event_tx: &Option<mpsc::Sender<DiscoveryEvent>>,
1675 ) {
1676 let local_peers = 2; for _ in 0..local_peers {
1680 let peer_id = LibP2PPeerId::random();
1681 let address = SocketAddr::from(([192, 168, 1, 10 + rand::random::<u8>() % 50], 8000));
1682
1683 if discovered_peers.read().await.contains_key(&peer_id) {
1685 continue;
1686 }
1687
1688 let discovered_peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Mdns);
1689
1690 discovered_peers
1692 .write()
1693 .await
1694 .insert(peer_id, discovered_peer.clone());
1695
1696 if let Some(tx) = event_tx {
1698 let _ = tx
1699 .send(DiscoveryEvent::PeerDiscovered(discovered_peer))
1700 .await;
1701 }
1702
1703 debug!("Discovered peer via mDNS: {:?} at {}", peer_id, address);
1704 }
1705 }
1706
1707 pub async fn get_discovered_peers(&self) -> Vec<DiscoveredPeer> {
1709 self.discovered_peers
1710 .read()
1711 .await
1712 .values()
1713 .cloned()
1714 .collect()
1715 }
1716
1717 pub async fn get_connectable_peers(&self) -> Vec<DiscoveredPeer> {
1719 self.discovered_peers
1720 .read()
1721 .await
1722 .values()
1723 .filter(|peer| peer.should_attempt_connection())
1724 .cloned()
1725 .collect()
1726 }
1727
1728 pub async fn update_peer_reputation(&self, peer_id: LibP2PPeerId, delta: f64) {
1730 if let Some(peer) = self.discovered_peers.write().await.get_mut(&peer_id) {
1731 let old_reputation = peer.reputation;
1732 peer.reputation += delta;
1733 peer.reputation = peer.reputation.clamp(-50.0, 50.0);
1734
1735 if let Some(tx) = &self.event_tx {
1736 let _ = tx
1737 .send(DiscoveryEvent::ReputationUpdated {
1738 peer_id,
1739 old_reputation,
1740 new_reputation: peer.reputation,
1741 reason: "Connection update".to_string(),
1742 })
1743 .await;
1744 }
1745 }
1746 }
1747
1748 pub async fn record_connection_attempt(&self, peer_id: LibP2PPeerId, success: bool) {
1750 if let Some(peer) = self.discovered_peers.write().await.get_mut(&peer_id) {
1751 peer.record_connection_attempt(success, &self.config.scoring_config);
1752
1753 if success {
1754 info!("Successful connection to peer: {:?}", peer_id);
1755 } else {
1756 warn!(
1757 "Failed connection to peer: {:?} (attempts: {})",
1758 peer_id, peer.connection_attempts
1759 );
1760 }
1761 }
1762 }
1763
1764 pub fn add_static_peer(&mut self, address: SocketAddr) {
1766 self.static_peers.insert(address);
1767 info!("Added static peer: {}", address);
1768 }
1769
1770 pub async fn cleanup_old_peers(&self) {
1772 let cutoff = Instant::now() - Duration::from_secs(3600);
1773
1774 self.discovered_peers.write().await.retain(|peer_id, peer| {
1775 let keep = peer.discovered_at > cutoff;
1776 if !keep {
1777 debug!("Removing old discovered peer: {:?}", peer_id);
1778 }
1779 keep
1780 });
1781 }
1782
1783 pub async fn get_discovery_stats(&self) -> DiscoveryStats {
1785 let peers = self.discovered_peers.read().await;
1786 let total_peers = peers.len();
1787
1788 let mut method_counts = HashMap::new();
1789 let mut avg_reputation = 0.0;
1790 let mut connectable_count = 0;
1791
1792 for peer in peers.values() {
1793 *method_counts
1794 .entry(peer.discovery_method.clone())
1795 .or_insert(0) += 1;
1796 avg_reputation += peer.reputation;
1797
1798 if peer.should_attempt_connection() {
1799 connectable_count += 1;
1800 }
1801 }
1802
1803 if total_peers > 0 {
1804 avg_reputation /= total_peers as f64;
1805 }
1806
1807 DiscoveryStats {
1808 total_discovered: total_peers,
1809 connectable_peers: connectable_count,
1810 method_counts,
1811 average_reputation: avg_reputation,
1812 bootstrap_completed: self.bootstrap_completed,
1813 }
1814 }
1815}
1816
1817#[derive(Debug, Clone)]
1819pub struct DiscoveryStats {
1820 pub total_discovered: usize,
1822 pub connectable_peers: usize,
1824 pub method_counts: HashMap<DiscoveryMethod, usize>,
1826 pub average_reputation: f64,
1828 pub bootstrap_completed: bool,
1830}
1831
1832#[cfg(test)]
1833mod tests {
1834 use super::*;
1835 use tokio::time::timeout;
1836
1837 #[tokio::test]
1838 async fn test_discovery_creation() {
1839 let config = DiscoveryConfig::default();
1840 let discovery = KademliaPeerDiscovery::new(config);
1841 assert!(!discovery.discovery_active);
1842 assert!(!discovery.bootstrap_completed);
1843 }
1844
1845 #[tokio::test]
1846 async fn test_bootstrap() {
1847 let mut config = DiscoveryConfig::default();
1848 config.bootstrap_nodes = vec![SocketAddr::from(([127, 0, 0, 1], 8000))];
1849
1850 let mut discovery = KademliaPeerDiscovery::new(config);
1851 let (tx, mut rx) = mpsc::channel(10);
1852 discovery.set_event_channel(tx);
1853
1854 discovery.bootstrap().await.unwrap();
1855 assert!(discovery.bootstrap_completed);
1856
1857 let event = timeout(Duration::from_millis(100), rx.recv()).await;
1859 assert!(event.is_ok());
1860 }
1861
1862 #[tokio::test]
1863 async fn test_peer_reputation() {
1864 let config = DiscoveryConfig::default();
1865 let discovery = KademliaPeerDiscovery::new(config);
1866
1867 let peer_id = LibP2PPeerId::random();
1868 let address = SocketAddr::from(([127, 0, 0, 1], 8000));
1869 let peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Kademlia);
1870
1871 discovery
1872 .discovered_peers
1873 .write()
1874 .await
1875 .insert(peer_id, peer);
1876
1877 discovery.update_peer_reputation(peer_id, 5.0).await;
1879
1880 let peers = discovery.get_discovered_peers().await;
1881 assert_eq!(peers[0].reputation, 5.0);
1882 }
1883
1884 #[tokio::test]
1885 async fn test_connection_attempts() {
1886 let config = DiscoveryConfig::default();
1887 let discovery = KademliaPeerDiscovery::new(config);
1888
1889 let peer_id = LibP2PPeerId::random();
1890 let address = SocketAddr::from(([127, 0, 0, 1], 8000));
1891 let peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Kademlia);
1892
1893 discovery
1894 .discovered_peers
1895 .write()
1896 .await
1897 .insert(peer_id, peer);
1898
1899 discovery.record_connection_attempt(peer_id, false).await;
1901
1902 let peers = discovery.get_discovered_peers().await;
1903 assert_eq!(peers[0].connection_attempts, 1);
1904 assert!(peers[0].reputation < 0.0);
1905 }
1906}