use crate::dark_resolver::DarkResolver;
use crate::shadow_address::{DefaultShadowAddressHandler, NetworkType, ShadowAddress};
use crate::types::NetworkError;
use libp2p::PeerId as LibP2PPeerId;
use rand::{seq::SliceRandom, Rng};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock, Semaphore};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DiscoveryMethod {
Kademlia,
Static,
Mdns,
Bootstrap,
DarkAddress,
DNS,
Gossip,
Hybrid(Vec<DiscoveryMethod>),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum NetworkConfig {
Public {
nat_traversal: bool,
upnp: bool,
stun_turn: bool,
},
Private {
allowed_ranges: Vec<String>,
require_auth: bool,
},
Hybrid {
public: Box<NetworkConfig>,
private: Box<NetworkConfig>,
fallback_public: bool,
},
}
#[derive(Debug, Clone)]
pub struct DiscoveryConfig {
pub methods: Vec<DiscoveryMethod>,
pub bootstrap_nodes: Vec<SocketAddr>,
pub interval: u64,
pub max_peers: usize,
pub min_peers: usize,
pub reputation_threshold: f64,
pub network_config: NetworkConfig,
pub enable_dark_addressing: bool,
pub dark_resolver_config: DarkResolverConfig,
pub dht_config: DHTConfig,
pub max_concurrent_connections: usize,
pub scoring_config: PeerScoringConfig,
pub load_balancing_config: LoadBalancingConfig,
pub geo_preferences: GeoPreferences,
}
#[derive(Debug, Clone)]
pub struct DHTConfig {
pub bucket_size: usize,
pub alpha: usize,
pub replication_factor: usize,
pub key_space_bits: usize,
pub bootstrap_timeout: Duration,
pub refresh_interval: Duration,
pub enable_republishing: bool,
}
#[derive(Debug, Clone)]
pub struct DarkResolverConfig {
pub max_cache_size: usize,
pub cache_ttl: Duration,
pub enable_distributed: bool,
pub fallback_dns: Vec<String>,
pub max_resolution_attempts: usize,
}
#[derive(Debug, Clone)]
pub struct PeerScoringConfig {
pub initial_score: f64,
pub max_score: f64,
pub min_score: f64,
pub score_decay_rate: f64,
pub connection_success_bonus: f64,
pub connection_failure_penalty: f64,
pub uptime_bonus: f64,
pub latency_penalty_factor: f64,
pub enable_geographic_scoring: bool,
}
#[derive(Debug, Clone)]
pub struct LoadBalancingConfig {
pub algorithm: LoadBalancingAlgorithm,
pub health_check_interval: Duration,
pub max_load_per_peer: f64,
pub enable_adaptive: bool,
pub circuit_breaker: CircuitBreakerConfig,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LoadBalancingAlgorithm {
RoundRobin,
WeightedRoundRobin,
LeastConnections,
LeastResponseTime,
Random,
ConsistentHashing,
ResourceBased,
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: usize,
pub success_threshold: usize,
pub timeout: Duration,
pub half_open_delay: Duration,
}
#[derive(Debug, Clone)]
pub struct GeoPreferences {
pub prefer_local: bool,
pub local_latency_threshold: Duration,
pub preferred_regions: Vec<String>,
pub avoided_regions: Vec<String>,
pub enable_geo_diversity: bool,
}
impl Default for DiscoveryConfig {
fn default() -> Self {
Self {
methods: vec![DiscoveryMethod::Kademlia, DiscoveryMethod::Mdns],
bootstrap_nodes: vec![],
interval: 30,
max_peers: 50,
min_peers: 8,
reputation_threshold: 0.0,
network_config: NetworkConfig::Public {
nat_traversal: true,
upnp: true,
stun_turn: true,
},
enable_dark_addressing: true,
dark_resolver_config: DarkResolverConfig::default(),
dht_config: DHTConfig::default(),
max_concurrent_connections: 100,
scoring_config: PeerScoringConfig::default(),
load_balancing_config: LoadBalancingConfig::default(),
geo_preferences: GeoPreferences::default(),
}
}
}
impl Default for DHTConfig {
fn default() -> Self {
Self {
bucket_size: 20,
alpha: 3,
replication_factor: 20,
key_space_bits: 256,
bootstrap_timeout: Duration::from_secs(30),
refresh_interval: Duration::from_secs(3600),
enable_republishing: true,
}
}
}
impl Default for DarkResolverConfig {
fn default() -> Self {
Self {
max_cache_size: 10000,
cache_ttl: Duration::from_secs(3600),
enable_distributed: true,
fallback_dns: vec!["8.8.8.8".to_string(), "1.1.1.1".to_string()],
max_resolution_attempts: 3,
}
}
}
impl Default for PeerScoringConfig {
fn default() -> Self {
Self {
initial_score: 50.0,
max_score: 100.0,
min_score: -50.0,
score_decay_rate: 0.1,
connection_success_bonus: 5.0,
connection_failure_penalty: 10.0,
uptime_bonus: 1.0,
latency_penalty_factor: 0.01,
enable_geographic_scoring: true,
}
}
}
impl Default for LoadBalancingConfig {
fn default() -> Self {
Self {
algorithm: LoadBalancingAlgorithm::WeightedRoundRobin,
health_check_interval: Duration::from_secs(30),
max_load_per_peer: 100.0,
enable_adaptive: true,
circuit_breaker: CircuitBreakerConfig::default(),
}
}
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
success_threshold: 3,
timeout: Duration::from_secs(60),
half_open_delay: Duration::from_secs(30),
}
}
}
impl Default for GeoPreferences {
fn default() -> Self {
Self {
prefer_local: true,
local_latency_threshold: Duration::from_millis(100),
preferred_regions: vec![],
avoided_regions: vec![],
enable_geo_diversity: true,
}
}
}
#[derive(Debug, Clone)]
pub struct DiscoveredPeer {
pub peer_id: LibP2PPeerId,
pub addresses: Vec<SocketAddr>,
pub dark_addresses: Vec<ShadowAddress>,
pub discovered_at: Instant,
pub discovery_method: DiscoveryMethod,
pub reputation: f64,
pub connection_attempts: u32,
pub successful_connections: u32,
pub last_connection_attempt: Option<Instant>,
pub last_successful_connection: Option<Instant>,
pub protocols: Vec<String>,
pub geographic_info: Option<GeographicInfo>,
pub performance_metrics: PeerPerformanceMetrics,
pub load_metrics: PeerLoadMetrics,
pub capabilities: PeerCapabilities,
pub connection_quality: ConnectionQuality,
pub is_blacklisted: bool,
pub blacklist_reason: Option<String>,
pub uptime_stats: UptimeStats,
pub circuit_breaker_state: CircuitBreakerState,
}
#[derive(Debug, Clone)]
pub struct GeographicInfo {
pub country_code: String,
pub city: Option<String>,
pub latitude: Option<f64>,
pub longitude: Option<f64>,
pub estimated_distance_km: Option<f64>,
pub asn: Option<u32>,
pub isp: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct PeerPerformanceMetrics {
pub avg_response_time: Duration,
pub min_response_time: Duration,
pub max_response_time: Duration,
pub p95_response_time: Duration,
pub throughput_mps: f64,
pub bandwidth_bps: u64,
pub error_rate: f64,
pub jitter: Duration,
pub packet_loss_rate: f64,
}
#[derive(Debug, Clone, Default)]
pub struct PeerLoadMetrics {
pub active_connections: usize,
pub load_score: f64,
pub cpu_utilization: Option<f64>,
pub memory_utilization: Option<f64>,
pub network_utilization: Option<f64>,
pub queue_depth: usize,
pub weight: f64,
}
#[derive(Debug, Clone, Default)]
pub struct PeerCapabilities {
pub protocol_versions: Vec<String>,
pub max_concurrent_connections: Option<usize>,
pub supported_message_types: Vec<String>,
pub supports_dark_addressing: bool,
pub supports_onion_routing: bool,
pub participates_in_dht: bool,
pub can_relay: bool,
pub provides_storage: bool,
pub bandwidth_capacity: Option<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionQuality {
pub overall_score: f64,
pub reliability_score: f64,
pub performance_score: f64,
pub availability_score: f64,
pub security_score: f64,
pub last_assessed: Option<Instant>,
}
#[derive(Debug, Clone, Default)]
pub struct UptimeStats {
pub total_observed_time: Duration,
pub total_uptime: Duration,
pub uptime_percentage: f64,
pub disconnection_count: u32,
pub avg_session_duration: Duration,
pub longest_session_duration: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CircuitBreakerState {
Closed,
Open {
opened_at: Instant,
failure_count: usize,
},
HalfOpen {
test_requests: usize,
successful_tests: usize,
},
}
impl DiscoveredPeer {
pub fn new(peer_id: LibP2PPeerId, address: SocketAddr, method: DiscoveryMethod) -> Self {
Self {
peer_id,
addresses: vec![address],
dark_addresses: vec![],
discovered_at: Instant::now(),
discovery_method: method,
reputation: 50.0, connection_attempts: 0,
successful_connections: 0,
last_connection_attempt: None,
last_successful_connection: None,
protocols: vec![],
geographic_info: None,
performance_metrics: PeerPerformanceMetrics::default(),
load_metrics: PeerLoadMetrics::default(),
capabilities: PeerCapabilities::default(),
connection_quality: ConnectionQuality::default(),
is_blacklisted: false,
blacklist_reason: None,
uptime_stats: UptimeStats::default(),
circuit_breaker_state: CircuitBreakerState::Closed,
}
}
pub fn should_attempt_connection(&self) -> bool {
self.should_attempt_connection_with_config(&PeerScoringConfig::default())
}
pub fn should_attempt_connection_with_config(&self, config: &PeerScoringConfig) -> bool {
if self.is_blacklisted {
return false;
}
match &self.circuit_breaker_state {
CircuitBreakerState::Open { opened_at, .. } => {
if opened_at.elapsed() < Duration::from_secs(60) {
return false;
}
}
CircuitBreakerState::HalfOpen { test_requests, .. } => {
if *test_requests >= 3 {
return false;
}
}
CircuitBreakerState::Closed => {}
}
if self.reputation < config.min_score {
return false;
}
if self.connection_attempts > 3 {
if let Some(last_attempt) = self.last_connection_attempt {
let backoff_time =
Duration::from_secs((self.connection_attempts as u64).pow(2) * 30);
if last_attempt.elapsed() < backoff_time {
return false;
}
}
}
if self.connection_quality.overall_score < 0.3 {
return false;
}
true
}
pub fn record_connection_attempt(&mut self, success: bool, config: &PeerScoringConfig) {
self.connection_attempts += 1;
self.last_connection_attempt = Some(Instant::now());
if success {
self.successful_connections += 1;
self.last_successful_connection = Some(Instant::now());
self.reputation += config.connection_success_bonus;
self.reputation = self.reputation.min(config.max_score);
match &self.circuit_breaker_state {
CircuitBreakerState::HalfOpen {
successful_tests, ..
} => {
let new_successful = successful_tests + 1;
if new_successful >= 3 {
self.circuit_breaker_state = CircuitBreakerState::Closed;
} else {
self.circuit_breaker_state = CircuitBreakerState::HalfOpen {
test_requests: 0,
successful_tests: new_successful,
};
}
}
_ => {
self.circuit_breaker_state = CircuitBreakerState::Closed;
}
}
self.update_connection_quality(true);
} else {
self.reputation -= config.connection_failure_penalty;
self.reputation = self.reputation.max(config.min_score);
match &self.circuit_breaker_state {
CircuitBreakerState::Closed => {
if self.connection_attempts >= 5 {
self.circuit_breaker_state = CircuitBreakerState::Open {
opened_at: Instant::now(),
failure_count: self.connection_attempts as usize,
};
}
}
CircuitBreakerState::HalfOpen { .. } => {
self.circuit_breaker_state = CircuitBreakerState::Open {
opened_at: Instant::now(),
failure_count: self.connection_attempts as usize,
};
}
_ => {}
}
self.update_connection_quality(false);
}
}
fn update_connection_quality(&mut self, _success: bool) {
let success_rate = if self.connection_attempts > 0 {
self.successful_connections as f64 / self.connection_attempts as f64
} else {
0.0
};
self.connection_quality.reliability_score = success_rate;
let performance_factor = 1.0 - (self.performance_metrics.error_rate * 0.5);
let availability_factor = self.uptime_stats.uptime_percentage / 100.0;
self.connection_quality.overall_score = (self.connection_quality.reliability_score * 0.4
+ performance_factor * 0.3
+ availability_factor * 0.2
+ self.connection_quality.security_score * 0.1)
.clamp(0.0, 1.0);
self.connection_quality.last_assessed = Some(Instant::now());
}
pub fn update_performance_metrics(&mut self, response_time: Duration, success: bool) {
if success {
if self.performance_metrics.min_response_time == Duration::ZERO {
self.performance_metrics.min_response_time = response_time;
self.performance_metrics.max_response_time = response_time;
self.performance_metrics.avg_response_time = response_time;
} else {
self.performance_metrics.min_response_time = self
.performance_metrics
.min_response_time
.min(response_time);
self.performance_metrics.max_response_time = self
.performance_metrics
.max_response_time
.max(response_time);
let alpha = 0.1;
let current_avg = self.performance_metrics.avg_response_time.as_secs_f64();
let new_avg = alpha * response_time.as_secs_f64() + (1.0 - alpha) * current_avg;
self.performance_metrics.avg_response_time = Duration::from_secs_f64(new_avg);
}
}
let total_requests = self.connection_attempts as f64;
let failed_requests = (self.connection_attempts - self.successful_connections) as f64;
self.performance_metrics.error_rate = if total_requests > 0.0 {
failed_requests / total_requests
} else {
0.0
};
}
pub fn update_load_metrics(&mut self, active_connections: usize, queue_depth: usize) {
self.load_metrics.active_connections = active_connections;
self.load_metrics.queue_depth = queue_depth;
let connection_factor = if let Some(max_conn) = self.capabilities.max_concurrent_connections
{
active_connections as f64 / max_conn as f64
} else {
active_connections as f64 / 100.0 };
let queue_factor = queue_depth as f64 / 50.0;
self.load_metrics.load_score =
((connection_factor + queue_factor) * 50.0).clamp(0.0, 100.0);
self.load_metrics.weight = (100.0 - self.load_metrics.load_score).max(1.0);
}
pub fn is_healthy(&self) -> bool {
!self.is_blacklisted
&& self.circuit_breaker_state == CircuitBreakerState::Closed
&& self.connection_quality.overall_score > 0.5
&& self.load_metrics.load_score < 90.0
}
pub fn calculate_priority(&self, config: &PeerScoringConfig) -> f64 {
let mut priority = self.reputation;
priority += self.connection_quality.overall_score * 20.0;
priority += (100.0 - self.load_metrics.load_score) * 0.1;
if let Some(geo_info) = &self.geographic_info {
if config.enable_geographic_scoring {
if let Some(distance) = geo_info.estimated_distance_km {
let distance_bonus = (1000.0 - distance.min(1000.0)) / 100.0;
priority += distance_bonus;
}
}
}
priority += self.uptime_stats.uptime_percentage * 0.1;
priority.max(0.0)
}
pub fn add_dark_address(&mut self, address: ShadowAddress) {
if !self.dark_addresses.contains(&address) {
self.dark_addresses.push(address);
self.capabilities.supports_dark_addressing = true;
}
}
pub fn update_geographic_info(&mut self, geo_info: GeographicInfo) {
self.geographic_info = Some(geo_info);
}
pub fn blacklist(&mut self, reason: String) {
self.is_blacklisted = true;
self.blacklist_reason = Some(reason);
self.reputation = -50.0; }
pub fn unblacklist(&mut self) {
self.is_blacklisted = false;
self.blacklist_reason = None;
self.reputation = 0.0; }
pub fn decay_reputation(&mut self, config: &PeerScoringConfig, hours_elapsed: f64) {
let decay_amount = config.score_decay_rate * hours_elapsed;
self.reputation = (self.reputation - decay_amount).max(config.min_score);
}
pub fn update_uptime(&mut self, is_online: bool, duration: Duration) {
self.uptime_stats.total_observed_time += duration;
if is_online {
self.uptime_stats.total_uptime += duration;
} else {
self.uptime_stats.disconnection_count += 1;
}
if self.uptime_stats.total_observed_time > Duration::ZERO {
self.uptime_stats.uptime_percentage = (self.uptime_stats.total_uptime.as_secs_f64()
/ self.uptime_stats.total_observed_time.as_secs_f64())
* 100.0;
}
}
}
#[allow(dead_code)]
pub struct KademliaPeerDiscovery {
config: DiscoveryConfig,
discovered_peers: Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
static_peers: HashSet<SocketAddr>,
bootstrap_completed: bool,
discovery_active: bool,
event_tx: Option<mpsc::Sender<DiscoveryEvent>>,
bootstrap_tried: HashSet<SocketAddr>,
last_discovery: Option<Instant>,
dark_resolver: Arc<DarkResolver>,
shadow_handler: DefaultShadowAddressHandler,
dht_buckets: Arc<RwLock<BTreeMap<usize, Vec<LibP2PPeerId>>>>,
connection_semaphore: Arc<Semaphore>,
load_balancer: Arc<Mutex<LoadBalancer>>,
peer_selector: Arc<Mutex<PeerSelector>>,
topology_optimizer: Arc<Mutex<TopologyOptimizer>>,
health_checker: Arc<Mutex<HealthChecker>>,
performance_monitor: Arc<Mutex<PerformanceMonitor>>,
bootstrap_strategy: BootstrapStrategy,
}
#[derive(Debug, Clone)]
pub enum BootstrapStrategy {
Conservative,
Aggressive,
Adaptive {
aggressiveness: f64,
last_adapted: Instant,
},
Custom {
max_concurrent: usize,
attempt_timeout: Duration,
retry_strategy: RetryStrategy,
},
}
#[derive(Debug, Clone)]
pub enum RetryStrategy {
ExponentialBackoff {
initial_delay: Duration,
max_delay: Duration,
multiplier: f64,
},
FixedInterval(Duration),
LinearBackoff {
initial_delay: Duration,
increment: Duration,
},
None,
}
#[derive(Debug, Clone)]
pub enum DiscoveryEvent {
PeerDiscovered(DiscoveredPeer),
ReputationUpdated {
peer_id: LibP2PPeerId,
old_reputation: f64,
new_reputation: f64,
reason: String,
},
BootstrapCompleted {
peers_discovered: usize,
duration: Duration,
success_rate: f64,
},
BootstrapFailed {
reason: String,
attempted_nodes: usize,
successful_connections: usize,
},
PeerConnected {
peer_id: LibP2PPeerId,
address: SocketAddr,
connection_time: Duration,
},
PeerDisconnected {
peer_id: LibP2PPeerId,
reason: String,
session_duration: Duration,
},
PeerBlacklisted {
peer_id: LibP2PPeerId,
reason: String,
reputation: f64,
},
DarkAddressDiscovered {
peer_id: LibP2PPeerId,
dark_address: ShadowAddress,
resolution_time: Duration,
},
TopologyUpdated {
largest_component_size: usize,
avg_clustering: f64,
diameter: Option<usize>,
},
LoadBalancingUpdated {
active_connections: usize,
load_entropy: f64,
overloaded_peers: usize,
},
GeographicDistributionUpdated {
countries: usize,
diversity_score: f64,
avg_distance_km: f64,
},
DHTBucketUpdated {
bucket_index: usize,
peer_count: usize,
health_score: f64,
},
DiscoveryError {
error: String,
category: DiscoveryErrorCategory,
retry_suggested: bool,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryErrorCategory {
NetworkError,
ConfigurationError,
ResourceError,
ProtocolError,
SecurityError,
TimeoutError,
DarkAddressingError,
DHTError,
}
#[derive(Debug)]
pub struct LoadBalancer {
pub algorithm: LoadBalancingAlgorithm,
pub round_robin_index: usize,
pub hash_ring: Vec<(u64, LibP2PPeerId)>,
pub peer_weights: HashMap<LibP2PPeerId, f64>,
pub connection_counts: HashMap<LibP2PPeerId, usize>,
pub response_times: HashMap<LibP2PPeerId, VecDeque<Duration>>,
pub health_status: HashMap<LibP2PPeerId, bool>,
}
impl LoadBalancer {
pub fn new(algorithm: LoadBalancingAlgorithm) -> Self {
Self {
algorithm,
round_robin_index: 0,
hash_ring: Vec::new(),
peer_weights: HashMap::new(),
connection_counts: HashMap::new(),
response_times: HashMap::new(),
health_status: HashMap::new(),
}
}
pub fn select_peer(&mut self, available_peers: &[LibP2PPeerId]) -> Option<LibP2PPeerId> {
if available_peers.is_empty() {
return None;
}
match &self.algorithm {
LoadBalancingAlgorithm::RoundRobin => {
let peer = available_peers[self.round_robin_index % available_peers.len()];
self.round_robin_index = (self.round_robin_index + 1) % available_peers.len();
Some(peer)
}
LoadBalancingAlgorithm::WeightedRoundRobin => {
let total_weight: f64 = available_peers
.iter()
.map(|p| self.peer_weights.get(p).unwrap_or(&1.0))
.sum();
let mut rng = rand::thread_rng();
let mut target = rng.gen::<f64>() * total_weight;
for peer in available_peers {
let weight = self.peer_weights.get(peer).unwrap_or(&1.0);
if target < *weight {
return Some(*peer);
}
target -= weight;
}
available_peers.last().copied()
}
LoadBalancingAlgorithm::LeastConnections => available_peers
.iter()
.min_by_key(|p| self.connection_counts.get(p).unwrap_or(&0))
.copied(),
LoadBalancingAlgorithm::LeastResponseTime => available_peers
.iter()
.min_by_key(|p| {
self.response_times
.get(p)
.and_then(|times| times.back())
.map(|d| d.as_millis())
.unwrap_or(u128::MAX)
})
.copied(),
LoadBalancingAlgorithm::Random => {
let mut rng = rand::thread_rng();
available_peers.choose(&mut rng).copied()
}
_ => available_peers.first().copied(),
}
}
pub fn update_metrics(
&mut self,
peer: LibP2PPeerId,
connections: usize,
response_time: Option<Duration>,
) {
self.connection_counts.insert(peer, connections);
if let Some(rt) = response_time {
self.response_times
.entry(peer)
.or_insert_with(|| VecDeque::with_capacity(10))
.push_back(rt);
if let Some(times) = self.response_times.get_mut(&peer) {
if times.len() > 10 {
times.pop_front();
}
}
}
}
}
#[derive(Debug)]
pub struct PeerSelector {
pub geo_preferences: GeoPreferences,
pub required_capabilities: Vec<String>,
pub strategy: PeerSelectionStrategy,
pub recent_selections: VecDeque<LibP2PPeerId>,
pub selection_history: HashMap<LibP2PPeerId, usize>,
}
impl PeerSelector {
pub fn new(geo_preferences: GeoPreferences) -> Self {
Self {
geo_preferences,
required_capabilities: vec![],
strategy: PeerSelectionStrategy::BestFirst,
recent_selections: VecDeque::with_capacity(100),
selection_history: HashMap::new(),
}
}
pub fn select_peers(
&mut self,
candidates: &[DiscoveredPeer],
count: usize,
scoring_config: &PeerScoringConfig,
) -> Vec<LibP2PPeerId> {
let mut selected = Vec::new();
let mut eligible: Vec<_> = candidates
.iter()
.filter(|p| p.is_healthy() && !p.is_blacklisted)
.filter(|p| p.connection_quality.reliability_score >= 0.5)
.filter(|p| self.meets_capability_requirements(p))
.collect();
match &self.strategy {
PeerSelectionStrategy::BestFirst => {
eligible.sort_by(|a, b| {
b.calculate_priority(scoring_config)
.partial_cmp(&a.calculate_priority(scoring_config))
.unwrap_or(std::cmp::Ordering::Equal)
});
}
PeerSelectionStrategy::Diversity => {
eligible.sort_by_key(|p| {
self.recent_selections
.iter()
.position(|id| id == &p.peer_id)
.unwrap_or(usize::MAX)
});
}
PeerSelectionStrategy::Probabilistic => {
let mut rng = rand::thread_rng();
eligible.shuffle(&mut rng);
}
_ => {}
}
for peer in eligible.into_iter().take(count) {
selected.push(peer.peer_id);
self.recent_selections.push_back(peer.peer_id);
if self.recent_selections.len() > 100 {
self.recent_selections.pop_front();
}
*self.selection_history.entry(peer.peer_id).or_insert(0) += 1;
}
selected
}
fn meets_capability_requirements(&self, peer: &DiscoveredPeer) -> bool {
if self.required_capabilities.is_empty() {
return true;
}
for required in &self.required_capabilities {
if !peer.capabilities.supported_message_types.contains(required)
&& !peer.capabilities.protocol_versions.contains(required)
{
return false;
}
}
true
}
}
#[derive(Debug, Clone)]
pub enum PeerSelectionStrategy {
BestFirst,
Probabilistic,
Diversity,
EpsilonGreedy { epsilon: f64 },
MultiArmedBandit,
}
#[derive(Debug)]
pub struct TopologyOptimizer {
pub target_clustering: f64,
pub target_path_length: f64,
pub min_connectivity: usize,
pub last_optimization: Instant,
pub optimization_interval: Duration,
pub metrics_history: VecDeque<TopologyMetrics>,
}
impl TopologyOptimizer {
pub fn new() -> Self {
Self {
target_clustering: 0.3,
target_path_length: 4.0,
min_connectivity: 3,
last_optimization: Instant::now(),
optimization_interval: Duration::from_secs(300),
metrics_history: VecDeque::with_capacity(100),
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct TopologyMetrics {
timestamp: Instant,
clustering_coefficient: f64,
avg_path_length: f64,
diameter: Option<usize>,
connected_components: usize,
largest_component_size: usize,
small_world_coefficient: f64,
}
#[derive(Debug)]
pub struct HealthChecker {
pub check_interval: Duration,
pub check_timeout: Duration,
pub last_checks: HashMap<LibP2PPeerId, Instant>,
pub health_results: HashMap<LibP2PPeerId, HealthCheckResult>,
pub config: HealthCheckConfig,
}
impl HealthChecker {
pub fn new(check_interval: Duration) -> Self {
Self {
check_interval,
check_timeout: Duration::from_secs(5),
last_checks: HashMap::new(),
health_results: HashMap::new(),
config: HealthCheckConfig::default(),
}
}
}
impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
enable_ping: true,
enable_capability_check: true,
enable_performance_monitoring: true,
unhealthy_multiplier: 2.0,
failure_threshold: 3,
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct HealthCheckConfig {
enable_ping: bool,
enable_capability_check: bool,
enable_performance_monitoring: bool,
unhealthy_multiplier: f64,
failure_threshold: usize,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct HealthCheckResult {
is_healthy: bool,
response_time: Option<Duration>,
last_successful: Option<Instant>,
consecutive_failures: usize,
check_details: HashMap<String, bool>,
errors: Vec<String>,
}
#[derive(Debug)]
pub struct PerformanceMonitor {
pub collection_interval: Duration,
pub last_collection: Instant,
pub system_metrics: SystemPerformanceMetrics,
pub peer_metrics: HashMap<LibP2PPeerId, PeerPerformanceMetrics>,
pub alerts: VecDeque<PerformanceAlert>,
}
impl PerformanceMonitor {
pub fn new() -> Self {
Self {
collection_interval: Duration::from_secs(60),
last_collection: Instant::now(),
system_metrics: SystemPerformanceMetrics::default(),
peer_metrics: HashMap::new(),
alerts: VecDeque::with_capacity(100),
}
}
}
#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub struct SystemPerformanceMetrics {
discovery_rate: f64,
connection_success_rate: f64,
avg_connection_time: Duration,
memory_usage_bytes: usize,
cpu_usage_percent: f64,
network_utilization_bps: u64,
dht_overhead_percent: f64,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct PerformanceAlert {
timestamp: Instant,
severity: AlertSeverity,
category: AlertCategory,
message: String,
peer_id: Option<LibP2PPeerId>,
suggested_actions: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum AlertSeverity {
Info,
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AlertCategory {
Performance,
Connectivity,
Security,
Resource,
Configuration,
}
impl KademliaPeerDiscovery {
pub fn new(config: DiscoveryConfig) -> Self {
let max_connections = config.max_concurrent_connections;
Self {
discovered_peers: Arc::new(RwLock::new(HashMap::new())),
static_peers: HashSet::new(),
bootstrap_completed: false,
discovery_active: false,
event_tx: None,
bootstrap_tried: HashSet::new(),
last_discovery: None,
dark_resolver: Arc::new(DarkResolver::new()),
shadow_handler: DefaultShadowAddressHandler::new(
NetworkType::Mainnet,
[0u8; 32], ),
dht_buckets: Arc::new(RwLock::new(BTreeMap::new())),
connection_semaphore: Arc::new(Semaphore::new(max_connections)),
load_balancer: Arc::new(Mutex::new(LoadBalancer::new(
config.load_balancing_config.algorithm.clone(),
))),
peer_selector: Arc::new(Mutex::new(PeerSelector::new(
config.geo_preferences.clone(),
))),
topology_optimizer: Arc::new(Mutex::new(TopologyOptimizer::new())),
health_checker: Arc::new(Mutex::new(HealthChecker::new(
config.load_balancing_config.health_check_interval,
))),
performance_monitor: Arc::new(Mutex::new(PerformanceMonitor::new())),
bootstrap_strategy: BootstrapStrategy::Adaptive {
aggressiveness: 0.5,
last_adapted: Instant::now(),
},
config,
}
}
pub fn set_event_channel(&mut self, tx: mpsc::Sender<DiscoveryEvent>) {
self.event_tx = Some(tx);
}
pub async fn start(&mut self) -> Result<(), NetworkError> {
if self.discovery_active {
return Ok(());
}
self.discovery_active = true;
info!("Starting Kademlia peer discovery service");
self.bootstrap().await?;
self.start_periodic_discovery().await;
Ok(())
}
pub async fn stop(&mut self) -> Result<(), NetworkError> {
self.discovery_active = false;
info!("Stopping Kademlia peer discovery service");
Ok(())
}
async fn bootstrap(&mut self) -> Result<(), NetworkError> {
if self.bootstrap_completed {
return Ok(());
}
info!(
"Starting DHT bootstrap with {} nodes",
self.config.bootstrap_nodes.len()
);
let start_time = Instant::now();
let mut discovered_peers = 0;
for bootstrap_addr in &self.config.bootstrap_nodes {
if self.bootstrap_tried.contains(bootstrap_addr) {
continue;
}
self.bootstrap_tried.insert(*bootstrap_addr);
let peer_id = LibP2PPeerId::random(); let discovered_peer =
DiscoveredPeer::new(peer_id, *bootstrap_addr, DiscoveryMethod::Bootstrap);
self.discovered_peers
.write()
.await
.insert(peer_id, discovered_peer.clone());
discovered_peers += 1;
if let Some(tx) = &self.event_tx {
let _ = tx
.send(DiscoveryEvent::PeerDiscovered(discovered_peer))
.await;
}
debug!("Added bootstrap peer: {} -> {:?}", bootstrap_addr, peer_id);
}
self.bootstrap_completed = true;
if let Some(tx) = &self.event_tx {
let _ = tx
.send(DiscoveryEvent::BootstrapCompleted {
peers_discovered: discovered_peers,
duration: start_time.elapsed(),
success_rate: discovered_peers as f64
/ self.config.bootstrap_nodes.len().max(1) as f64,
})
.await;
}
info!("DHT bootstrap completed");
Ok(())
}
async fn start_periodic_discovery(&mut self) {
let interval = Duration::from_secs(self.config.interval);
let discovered_peers = Arc::clone(&self.discovered_peers);
let event_tx = self.event_tx.clone();
let methods = self.config.methods.clone();
let max_peers = self.config.max_peers;
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
interval_timer.tick().await;
for method in &methods {
match method {
DiscoveryMethod::Kademlia => {
Self::discover_kademlia_peers(&discovered_peers, &event_tx, max_peers)
.await;
}
DiscoveryMethod::Mdns => {
Self::discover_mdns_peers(&discovered_peers, &event_tx).await;
}
_ => {} }
}
}
});
}
async fn discover_kademlia_peers(
discovered_peers: &Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
event_tx: &Option<mpsc::Sender<DiscoveryEvent>>,
max_peers: usize,
) {
let current_count = discovered_peers.read().await.len();
if current_count >= max_peers {
return;
}
let peers_to_discover = (max_peers - current_count).min(5);
for _ in 0..peers_to_discover {
let peer_id = LibP2PPeerId::random();
let address =
SocketAddr::from(([192, 168, 1, 100], 8000 + rand::random::<u16>() % 1000));
let discovered_peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Kademlia);
discovered_peers
.write()
.await
.insert(peer_id, discovered_peer.clone());
if let Some(tx) = event_tx {
let _ = tx
.send(DiscoveryEvent::PeerDiscovered(discovered_peer))
.await;
}
debug!("Discovered peer via Kademlia: {:?} at {}", peer_id, address);
}
}
async fn discover_mdns_peers(
discovered_peers: &Arc<RwLock<HashMap<LibP2PPeerId, DiscoveredPeer>>>,
event_tx: &Option<mpsc::Sender<DiscoveryEvent>>,
) {
let local_peers = 2;
for _ in 0..local_peers {
let peer_id = LibP2PPeerId::random();
let address = SocketAddr::from(([192, 168, 1, 10 + rand::random::<u8>() % 50], 8000));
if discovered_peers.read().await.contains_key(&peer_id) {
continue;
}
let discovered_peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Mdns);
discovered_peers
.write()
.await
.insert(peer_id, discovered_peer.clone());
if let Some(tx) = event_tx {
let _ = tx
.send(DiscoveryEvent::PeerDiscovered(discovered_peer))
.await;
}
debug!("Discovered peer via mDNS: {:?} at {}", peer_id, address);
}
}
pub async fn get_discovered_peers(&self) -> Vec<DiscoveredPeer> {
self.discovered_peers
.read()
.await
.values()
.cloned()
.collect()
}
pub async fn get_connectable_peers(&self) -> Vec<DiscoveredPeer> {
self.discovered_peers
.read()
.await
.values()
.filter(|peer| peer.should_attempt_connection())
.cloned()
.collect()
}
pub async fn update_peer_reputation(&self, peer_id: LibP2PPeerId, delta: f64) {
if let Some(peer) = self.discovered_peers.write().await.get_mut(&peer_id) {
let old_reputation = peer.reputation;
peer.reputation += delta;
peer.reputation = peer.reputation.clamp(-50.0, 50.0);
if let Some(tx) = &self.event_tx {
let _ = tx
.send(DiscoveryEvent::ReputationUpdated {
peer_id,
old_reputation,
new_reputation: peer.reputation,
reason: "Connection update".to_string(),
})
.await;
}
}
}
pub async fn record_connection_attempt(&self, peer_id: LibP2PPeerId, success: bool) {
if let Some(peer) = self.discovered_peers.write().await.get_mut(&peer_id) {
peer.record_connection_attempt(success, &self.config.scoring_config);
if success {
info!("Successful connection to peer: {:?}", peer_id);
} else {
warn!(
"Failed connection to peer: {:?} (attempts: {})",
peer_id, peer.connection_attempts
);
}
}
}
pub fn add_static_peer(&mut self, address: SocketAddr) {
self.static_peers.insert(address);
info!("Added static peer: {}", address);
}
pub async fn cleanup_old_peers(&self) {
let cutoff = Instant::now() - Duration::from_secs(3600);
self.discovered_peers.write().await.retain(|peer_id, peer| {
let keep = peer.discovered_at > cutoff;
if !keep {
debug!("Removing old discovered peer: {:?}", peer_id);
}
keep
});
}
pub async fn get_discovery_stats(&self) -> DiscoveryStats {
let peers = self.discovered_peers.read().await;
let total_peers = peers.len();
let mut method_counts = HashMap::new();
let mut avg_reputation = 0.0;
let mut connectable_count = 0;
for peer in peers.values() {
*method_counts
.entry(peer.discovery_method.clone())
.or_insert(0) += 1;
avg_reputation += peer.reputation;
if peer.should_attempt_connection() {
connectable_count += 1;
}
}
if total_peers > 0 {
avg_reputation /= total_peers as f64;
}
DiscoveryStats {
total_discovered: total_peers,
connectable_peers: connectable_count,
method_counts,
average_reputation: avg_reputation,
bootstrap_completed: self.bootstrap_completed,
}
}
}
#[derive(Debug, Clone)]
pub struct DiscoveryStats {
pub total_discovered: usize,
pub connectable_peers: usize,
pub method_counts: HashMap<DiscoveryMethod, usize>,
pub average_reputation: f64,
pub bootstrap_completed: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::timeout;
#[tokio::test]
async fn test_discovery_creation() {
let config = DiscoveryConfig::default();
let discovery = KademliaPeerDiscovery::new(config);
assert!(!discovery.discovery_active);
assert!(!discovery.bootstrap_completed);
}
#[tokio::test]
async fn test_bootstrap() {
let mut config = DiscoveryConfig::default();
config.bootstrap_nodes = vec![SocketAddr::from(([127, 0, 0, 1], 8000))];
let mut discovery = KademliaPeerDiscovery::new(config);
let (tx, mut rx) = mpsc::channel(10);
discovery.set_event_channel(tx);
discovery.bootstrap().await.unwrap();
assert!(discovery.bootstrap_completed);
let event = timeout(Duration::from_millis(100), rx.recv()).await;
assert!(event.is_ok());
}
#[tokio::test]
async fn test_peer_reputation() {
let config = DiscoveryConfig::default();
let discovery = KademliaPeerDiscovery::new(config);
let peer_id = LibP2PPeerId::random();
let address = SocketAddr::from(([127, 0, 0, 1], 8000));
let peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Kademlia);
discovery
.discovered_peers
.write()
.await
.insert(peer_id, peer);
discovery.update_peer_reputation(peer_id, 5.0).await;
let peers = discovery.get_discovered_peers().await;
assert_eq!(peers[0].reputation, 5.0);
}
#[tokio::test]
async fn test_connection_attempts() {
let config = DiscoveryConfig::default();
let discovery = KademliaPeerDiscovery::new(config);
let peer_id = LibP2PPeerId::random();
let address = SocketAddr::from(([127, 0, 0, 1], 8000));
let peer = DiscoveredPeer::new(peer_id, address, DiscoveryMethod::Kademlia);
discovery
.discovered_peers
.write()
.await
.insert(peer_id, peer);
discovery.record_connection_attempt(peer_id, false).await;
let peers = discovery.get_discovered_peers().await;
assert_eq!(peers[0].connection_attempts, 1);
assert!(peers[0].reputation < 0.0);
}
}