#![deny(unsafe_code)]
use crate::types::{
ConnectionStatus, LatencyMetrics, NetworkError, NetworkMetrics, PeerId, QueueMetrics,
ThroughputMetrics,
};
use anyhow::Result;
use async_trait::async_trait;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use dashmap::DashMap;
use futures::future::Future;
use parking_lot::RwLock as ParkingRwLock;
use quinn::{Connection, Endpoint};
use ring::{aead, agreement, rand as ring_rand};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock as TokioRwLock, Semaphore};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
#[derive(Clone)]
pub struct SecureConfig {
pub transport_keys: TransportKeys,
pub timeout: std::time::Duration,
pub keepalive: std::time::Duration,
}
pub struct TransportKeys {
#[allow(dead_code)]
private_key: agreement::EphemeralPrivateKey,
public_key: Vec<u8>,
}
impl Clone for TransportKeys {
fn clone(&self) -> Self {
Self::generate()
}
}
impl TransportKeys {
pub fn generate() -> Self {
let rng = ring_rand::SystemRandom::new();
let private_key =
agreement::EphemeralPrivateKey::generate(&agreement::X25519, &rng).unwrap();
let public_key = private_key.compute_public_key().unwrap().as_ref().to_vec();
Self {
private_key,
public_key,
}
}
}
pub struct SecureConnection {
#[allow(dead_code)]
connection: Connection,
#[allow(dead_code)]
keys: TransportKeys,
channels: ConnectionChannels,
}
struct ConnectionChannels {
tx: mpsc::Sender<Bytes>,
rx: mpsc::Receiver<Bytes>,
batch_buffer: BytesMut,
batch_size: usize,
batch_timeout: std::time::Duration,
last_batch: std::time::Instant,
high_water_mark: usize,
low_water_mark: usize,
back_pressure: Arc<tokio::sync::Notify>,
queue_size: AtomicUsize,
key_cache: Arc<aead::LessSafeKey>,
nonce_counter: AtomicU64,
message_count: AtomicU64,
bytes_processed: AtomicU64,
}
impl SecureConnection {
pub async fn new(
endpoint: &Endpoint,
addr: SocketAddr,
config: SecureConfig,
) -> Result<Self, NetworkError> {
let connection = endpoint
.connect(addr, "qudag")
.map_err(|e| NetworkError::ConnectionError(e.to_string()))?
.await
.map_err(|e| NetworkError::ConnectionError(e.to_string()))?;
let (tx, rx) = mpsc::channel(65_536);
let key = aead::UnboundKey::new(
&aead::CHACHA20_POLY1305,
&config.transport_keys.public_key[..32],
)
.map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
let key_cache = Arc::new(aead::LessSafeKey::new(key));
Ok(Self {
connection,
keys: config.transport_keys,
channels: ConnectionChannels {
tx,
rx,
batch_buffer: BytesMut::with_capacity(1024 * 1024), batch_size: 128, batch_timeout: std::time::Duration::from_millis(50),
last_batch: std::time::Instant::now(),
high_water_mark: 64 * 1024 * 1024, low_water_mark: 32 * 1024 * 1024, back_pressure: Arc::new(tokio::sync::Notify::new()),
queue_size: AtomicUsize::new(0),
key_cache,
nonce_counter: AtomicU64::new(1),
message_count: AtomicU64::new(0),
bytes_processed: AtomicU64::new(0),
},
})
}
pub async fn send(&mut self, data: Bytes) -> Result<(), NetworkError> {
let msg_size = data.len();
if msg_size == 0 {
return Err(NetworkError::MessageError("Empty message".into()));
}
if msg_size > 1024 * 1024 {
return Err(NetworkError::MessageError("Message too large".into()));
}
let current_size = self.channels.queue_size.load(Ordering::Acquire);
if current_size >= self.channels.high_water_mark {
debug!("Applying back pressure, queue size: {}", current_size);
let back_pressure = self.channels.back_pressure.clone();
tokio::select! {
_ = back_pressure.notified() => {},
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
return Err(NetworkError::ConnectionError("Back pressure timeout".into()));
}
}
}
let nonce_value = self.channels.nonce_counter.fetch_add(1, Ordering::Relaxed);
if nonce_value == 0 {
error!("Nonce counter overflow - this should not happen in normal operation");
return Err(NetworkError::EncryptionError("Nonce overflow".into()));
}
let mut nonce_bytes = [0u8; 12];
nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
let mut encrypted = BytesMut::from(data.as_ref());
let mut retry_count = 0;
loop {
let nonce_attempt = aead::Nonce::assume_unique_for_key(nonce_bytes);
match self.channels.key_cache.seal_in_place_append_tag(
nonce_attempt,
aead::Aad::empty(),
&mut encrypted,
) {
Ok(()) => break,
Err(e) => {
retry_count += 1;
if retry_count >= 3 {
return Err(NetworkError::EncryptionError(format!(
"Encryption failed after {} retries: {}",
retry_count, e
)));
}
warn!("Encryption attempt {} failed, retrying: {}", retry_count, e);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
let encrypted_len = encrypted.len() as u32;
self.channels.batch_buffer.put_u32(encrypted_len);
self.channels.batch_buffer.extend_from_slice(&encrypted);
self.channels
.queue_size
.fetch_add(msg_size, Ordering::Release);
self.channels.message_count.fetch_add(1, Ordering::Relaxed);
self.channels
.bytes_processed
.fetch_add(msg_size as u64, Ordering::Relaxed);
if self.channels.batch_buffer.len() >= self.channels.batch_size * 1024
|| self.channels.last_batch.elapsed() >= self.channels.batch_timeout
{
self.flush_batch().await?
}
Ok(())
}
async fn flush_batch(&mut self) -> Result<(), NetworkError> {
if self.channels.batch_buffer.is_empty() {
return Ok(());
}
let batch_size = self.channels.batch_buffer.len();
let batch = self.channels.batch_buffer.split().freeze();
let mut retry_count = 0;
loop {
match self.channels.tx.send(batch.clone()).await {
Ok(()) => break,
Err(e) => {
retry_count += 1;
if retry_count >= 3 {
self.channels.batch_buffer.extend_from_slice(&batch);
return Err(NetworkError::ConnectionError(format!(
"Batch send failed after {} retries: {}",
retry_count, e
)));
}
warn!("Batch send attempt {} failed, retrying: {}", retry_count, e);
tokio::time::sleep(std::time::Duration::from_millis(100 * retry_count as u64))
.await;
}
}
}
let new_size = self
.channels
.queue_size
.fetch_sub(batch_size, Ordering::AcqRel);
if new_size <= self.channels.low_water_mark {
self.channels.back_pressure.notify_waiters();
debug!("Released back pressure, queue size: {}", new_size);
}
self.channels.last_batch = std::time::Instant::now();
Ok(())
}
pub async fn receive(&mut self) -> Result<Vec<Bytes>, NetworkError> {
let encrypted_batch = self
.channels
.rx
.recv()
.await
.ok_or_else(|| NetworkError::ConnectionError("Channel closed".into()))?;
let mut messages = Vec::new();
let mut buf = encrypted_batch;
while buf.has_remaining() {
if buf.remaining() < 4 {
return Err(NetworkError::EncryptionError(
"Incomplete message length".into(),
));
}
let msg_len = buf.get_u32() as usize;
if buf.remaining() < msg_len {
return Err(NetworkError::EncryptionError(
"Incomplete message data".into(),
));
}
let encrypted_data = buf.copy_to_bytes(msg_len);
let nonce_value = self.channels.nonce_counter.load(Ordering::Relaxed);
let mut nonce_bytes = [0u8; 12];
nonce_bytes[..8].copy_from_slice(&nonce_value.to_le_bytes());
let nonce = aead::Nonce::assume_unique_for_key(nonce_bytes);
let mut message_data = BytesMut::from(encrypted_data.as_ref());
self.channels
.key_cache
.open_in_place(nonce, aead::Aad::empty(), &mut message_data)
.map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
if message_data.len() >= 16 {
message_data.truncate(message_data.len() - 16);
}
messages.push(message_data.freeze());
}
Ok(messages)
}
}
pub struct ConnectionManager {
max_connections: usize,
connections: Arc<DashMap<PeerId, ConnectionInfo>>,
connection_pool: Arc<DashMap<PeerId, (ConnectionInfo, Instant)>>,
enhanced_pool: Arc<DashMap<PeerId, PooledConnection>>,
multiplexer: Arc<ConnectionMultiplexer>,
retry_manager: Arc<RetryManager>,
load_balancer: Arc<LoadBalancer>,
health_monitor: Arc<HealthMonitor>,
warming_manager: Arc<WarmingManager>,
pool_timeout: std::time::Duration,
metrics: Arc<ParkingRwLock<NetworkMetrics>>,
queue_metrics: Arc<ParkingRwLock<QueueMetrics>>,
latency_metrics: Arc<ParkingRwLock<LatencyMetrics>>,
throughput_metrics: Arc<ParkingRwLock<ThroughputMetrics>>,
#[allow(dead_code)]
health_tracker: Arc<RwLock<ConnectionHealthTracker>>,
circuit_breakers: Arc<DashMap<PeerId, CircuitBreaker>>,
quality_scores: Arc<DashMap<PeerId, f64>>,
#[allow(dead_code)]
maintenance_handle: Option<tokio::task::JoinHandle<()>>,
connection_limits: ConnectionLimits,
#[allow(dead_code)]
monitoring_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
pub status: ConnectionStatus,
pub established_at: Instant,
pub last_activity: Instant,
pub success_count: u64,
pub failure_count: u64,
pub avg_response_time: Duration,
pub quality_score: f64,
pub bandwidth_usage: u64,
pub metadata: HashMap<String, String>,
}
impl ConnectionInfo {
pub fn new(status: ConnectionStatus) -> Self {
Self {
status,
established_at: Instant::now(),
last_activity: Instant::now(),
success_count: 0,
failure_count: 0,
avg_response_time: Duration::from_millis(0),
quality_score: 1.0,
bandwidth_usage: 0,
metadata: HashMap::new(),
}
}
pub fn update_activity(
&mut self,
success: bool,
response_time: Duration,
bytes_transferred: u64,
) {
self.last_activity = Instant::now();
self.bandwidth_usage += bytes_transferred;
if success {
self.success_count += 1;
} else {
self.failure_count += 1;
}
let alpha = 0.1; let current_ms = self.avg_response_time.as_millis() as f64;
let new_ms = response_time.as_millis() as f64;
let updated_ms = alpha * new_ms + (1.0 - alpha) * current_ms;
self.avg_response_time = Duration::from_millis(updated_ms as u64);
self.update_quality_score();
}
fn update_quality_score(&mut self) {
let total_ops = self.success_count + self.failure_count;
if total_ops == 0 {
self.quality_score = 1.0;
return;
}
let success_rate = self.success_count as f64 / total_ops as f64;
let response_penalty = if self.avg_response_time.as_millis() > 100 {
0.2 * (self.avg_response_time.as_millis() as f64 / 1000.0)
} else {
0.0
};
self.quality_score = (success_rate - response_penalty).clamp(0.0, 1.0);
}
pub fn is_healthy(&self) -> bool {
self.quality_score > 0.5 && self.last_activity.elapsed() < Duration::from_secs(300)
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct ConnectionHealthTracker {
check_interval: Duration,
last_check: Option<Instant>,
unhealthy_connections: HashMap<PeerId, UnhealthyConnectionInfo>,
health_stats: HealthStatistics,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct UnhealthyConnectionInfo {
unhealthy_since: Instant,
recovery_attempts: u32,
last_recovery_attempt: Option<Instant>,
reason: String,
}
#[derive(Debug, Clone, Default)]
pub struct HealthStatistics {
pub total_checks: u64,
pub healthy_count: u64,
pub unhealthy_count: u64,
pub successful_recoveries: u64,
pub avg_recovery_time: Duration,
}
impl Default for ConnectionHealthTracker {
fn default() -> Self {
Self {
check_interval: Duration::from_secs(30),
last_check: None,
unhealthy_connections: HashMap::new(),
health_stats: HealthStatistics::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreaker {
state: CircuitBreakerState,
failure_threshold: u32,
failure_count: u32,
opened_at: Option<Instant>,
timeout: Duration,
success_threshold: u32,
success_count: u32,
}
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitBreakerState {
Closed,
Open,
HalfOpen,
}
impl Default for CircuitBreaker {
fn default() -> Self {
Self {
state: CircuitBreakerState::Closed,
failure_threshold: 5,
failure_count: 0,
opened_at: None,
timeout: Duration::from_secs(60),
success_threshold: 3,
success_count: 0,
}
}
}
impl CircuitBreaker {
pub fn allow_request(&mut self) -> bool {
match self.state {
CircuitBreakerState::Closed => true,
CircuitBreakerState::Open => {
if let Some(opened_at) = self.opened_at {
if opened_at.elapsed() >= self.timeout {
self.state = CircuitBreakerState::HalfOpen;
self.success_count = 0;
true
} else {
false
}
} else {
false
}
}
CircuitBreakerState::HalfOpen => true,
}
}
pub fn record_result(&mut self, success: bool) {
match self.state {
CircuitBreakerState::Closed => {
if success {
self.failure_count = 0;
} else {
self.failure_count += 1;
if self.failure_count >= self.failure_threshold {
self.state = CircuitBreakerState::Open;
self.opened_at = Some(Instant::now());
}
}
}
CircuitBreakerState::HalfOpen => {
if success {
self.success_count += 1;
if self.success_count >= self.success_threshold {
self.state = CircuitBreakerState::Closed;
self.failure_count = 0;
}
} else {
self.state = CircuitBreakerState::Open;
self.opened_at = Some(Instant::now());
self.failure_count += 1;
}
}
CircuitBreakerState::Open => {
}
}
}
}
#[derive(Debug, Clone)]
pub struct PooledConnection {
pub info: ConnectionInfo,
pub created_at: Instant,
pub last_used: Instant,
pub usage_count: u64,
pub weight: f64,
pub max_streams: u32,
pub active_streams: u32,
pub warming_state: WarmingState,
pub affinity_group: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum WarmingState {
Cold,
Warming,
Warm,
FailedToWarm(String),
}
#[derive(Debug)]
pub struct ConnectionMultiplexer {
connections: Arc<DashMap<PeerId, MultiplexedConnection>>,
stream_routes: Arc<DashMap<StreamId, PeerId>>,
priority_queue: Arc<TokioRwLock<BTreeMap<Priority, VecDeque<StreamId>>>>,
max_streams_per_connection: u32,
#[allow(dead_code)]
stream_timeout: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct StreamId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
Critical = 0,
High = 1,
Normal = 2,
Low = 3,
}
#[derive(Debug)]
pub struct MultiplexedConnection {
pub info: ConnectionInfo,
pub streams: HashMap<StreamId, StreamInfo>,
pub next_stream_id: u64,
pub utilization: f64,
pub stream_semaphore: Arc<Semaphore>,
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub id: StreamId,
pub priority: Priority,
pub state: StreamState,
pub created_at: Instant,
pub last_activity: Instant,
pub bytes_transferred: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum StreamState {
Opening,
Active,
HalfClosedLocal,
HalfClosedRemote,
Closed,
Error(String),
}
#[derive(Debug)]
pub struct RetryManager {
retry_configs: Arc<DashMap<PeerId, RetryConfig>>,
default_config: RetryConfig,
stats: Arc<TokioRwLock<RetryStats>>,
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
pub jitter_factor: f64,
pub timeout: Duration,
}
#[derive(Debug, Clone, Default)]
pub struct RetryStats {
pub total_attempts: u64,
pub successful_retries: u64,
pub failed_retries: u64,
pub avg_retry_duration: Duration,
}
#[derive(Debug)]
pub struct LoadBalancer {
strategy: LoadBalancingStrategy,
weights: Arc<DashMap<PeerId, f64>>,
round_robin_counter: AtomicU64,
stats: Arc<TokioRwLock<LoadBalancingStats>>,
}
#[derive(Debug, Clone)]
pub enum LoadBalancingStrategy {
RoundRobin,
LeastConnections,
WeightedRoundRobin,
ResponseTime,
ResourceUtilization,
}
#[derive(Debug, Clone, Default)]
pub struct LoadBalancingStats {
pub total_requests: u64,
pub peer_distribution: HashMap<PeerId, u64>,
pub avg_response_times: HashMap<PeerId, Duration>,
}
#[derive(Debug)]
pub struct HealthMonitor {
config: HealthCheckConfig,
results: Arc<DashMap<PeerId, HealthCheckResult>>,
scheduler: Arc<TokioRwLock<HealthCheckScheduler>>,
stats: Arc<TokioRwLock<HealthStats>>,
}
#[derive(Debug, Clone)]
pub struct HealthCheckConfig {
pub interval: Duration,
pub timeout: Duration,
pub failure_threshold: u32,
pub recovery_threshold: u32,
pub check_type: HealthCheckType,
}
#[derive(Debug, Clone)]
pub enum HealthCheckType {
Ping,
Application,
Custom,
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub timestamp: Instant,
pub success: bool,
pub response_time: Duration,
pub error: Option<String>,
pub health_score: f64,
}
#[derive(Debug)]
pub struct HealthCheckScheduler {
scheduled_checks: HashMap<PeerId, Instant>,
check_intervals: HashMap<PeerId, Duration>,
}
#[derive(Debug, Clone, Default)]
pub struct HealthStats {
pub total_checks: u64,
pub successful_checks: u64,
pub failed_checks: u64,
pub avg_response_time: Duration,
}
#[derive(Debug)]
pub struct WarmingManager {
config: WarmingConfig,
warming_states: Arc<DashMap<PeerId, WarmingState>>,
#[allow(dead_code)]
warming_tasks: Arc<DashMap<PeerId, tokio::task::JoinHandle<()>>>,
stats: Arc<TokioRwLock<WarmingStats>>,
}
#[derive(Debug, Clone)]
pub struct WarmingConfig {
pub enabled: bool,
pub min_pool_size: usize,
pub warming_timeout: Duration,
pub warming_retries: u32,
pub predictive_threshold: f64,
}
#[derive(Debug, Clone, Default)]
pub struct WarmingStats {
pub total_attempts: u64,
pub successful_warmings: u64,
pub failed_warmings: u64,
pub avg_warming_time: Duration,
}
#[derive(Debug, Clone)]
pub struct ConnectionLimits {
pub max_total: usize,
pub max_per_peer: usize,
pub max_idle: usize,
pub connection_timeout: Duration,
pub idle_timeout: Duration,
}
impl Default for ConnectionLimits {
fn default() -> Self {
Self {
max_total: 1000,
max_per_peer: 10,
max_idle: 100,
connection_timeout: Duration::from_secs(30),
idle_timeout: Duration::from_secs(300),
}
}
}
#[async_trait]
pub trait HealthCheck: Send + Sync {
async fn check(&self, peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult;
}
#[derive(Debug)]
pub struct PingHealthCheck {
#[allow(dead_code)]
timeout: Duration,
}
impl Default for PingHealthCheck {
fn default() -> Self {
Self {
timeout: Duration::from_secs(5),
}
}
}
#[async_trait]
impl HealthCheck for PingHealthCheck {
async fn check(&self, _peer_id: &PeerId, connection: &ConnectionInfo) -> HealthCheckResult {
let start = Instant::now();
let success = connection.is_healthy() && rand::random::<f64>() > 0.1;
let response_time = start.elapsed();
HealthCheckResult {
timestamp: Instant::now(),
success,
response_time,
error: if success {
None
} else {
Some("Ping timeout".to_string())
},
health_score: if success { 1.0 } else { 0.0 },
}
}
}
use std::collections::{BTreeMap, HashMap, VecDeque};
use tokio::sync::RwLock;
impl ConnectionManager {
pub async fn recover_connection(&self, peer_id: &PeerId) -> Result<(), NetworkError> {
debug!("Attempting to recover connection for peer {:?}", peer_id);
self.connections.remove(peer_id);
self.connection_pool.remove(peer_id);
let mut retry_count = 0;
let max_retries = 5;
while retry_count < max_retries {
match self.connect(*peer_id).await {
Ok(()) => {
info!("Successfully recovered connection for peer {:?}", peer_id);
return Ok(());
}
Err(e) => {
retry_count += 1;
let backoff_ms = 100u64 * (1 << retry_count); warn!(
"Connection recovery attempt {} failed for peer {:?}: {}, retrying in {}ms",
retry_count, peer_id, e, backoff_ms
);
if retry_count >= max_retries {
error!(
"Failed to recover connection for peer {:?} after {} attempts",
peer_id, max_retries
);
return Err(NetworkError::ConnectionError(format!(
"Recovery failed after {} attempts",
max_retries
)));
}
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
}
}
}
Err(NetworkError::ConnectionError("Max retries exceeded".into()))
}
pub async fn health_check(&self) -> Result<Vec<PeerId>, NetworkError> {
let mut unhealthy_peers = Vec::new();
for entry in self.connections.iter() {
let peer_id = *entry.key();
let conn_info = entry.value();
match &conn_info.status {
ConnectionStatus::Failed(_) => {
unhealthy_peers.push(peer_id);
warn!("Detected failed connection for peer {:?}", peer_id);
}
ConnectionStatus::Disconnected => {
unhealthy_peers.push(peer_id);
debug!("Detected disconnected peer {:?}", peer_id);
}
_ => {
if !conn_info.is_healthy() {
unhealthy_peers.push(peer_id);
debug!(
"Detected unhealthy connection for peer {:?} (quality: {:.2})",
peer_id, conn_info.quality_score
);
}
}
}
}
if !unhealthy_peers.is_empty() {
info!(
"Health check found {} unhealthy connections",
unhealthy_peers.len()
);
}
Ok(unhealthy_peers)
}
pub async fn auto_recover(&self) -> Result<usize, NetworkError> {
let unhealthy_peers = self.health_check().await?;
let total_unhealthy = unhealthy_peers.len();
let mut recovered_count = 0;
for peer_id in unhealthy_peers {
match self.recover_connection(&peer_id).await {
Ok(()) => {
recovered_count += 1;
debug!("Auto-recovered connection for peer {:?}", peer_id);
}
Err(e) => {
warn!(
"Failed to auto-recover connection for peer {:?}: {}",
peer_id, e
);
}
}
}
if recovered_count > 0 {
info!(
"Auto-recovery completed: {}/{} connections recovered",
recovered_count, total_unhealthy
);
}
Ok(recovered_count)
}
pub fn new(max_connections: usize) -> Self {
Self::with_pool_timeout(max_connections, std::time::Duration::from_secs(300))
}
pub fn with_pool_timeout(max_connections: usize, pool_timeout: std::time::Duration) -> Self {
let connection_limits = ConnectionLimits {
max_total: max_connections,
..Default::default()
};
Self {
max_connections,
connections: Arc::new(DashMap::new()),
connection_pool: Arc::new(DashMap::new()),
enhanced_pool: Arc::new(DashMap::new()),
multiplexer: Arc::new(ConnectionMultiplexer::new(32, Duration::from_secs(30))),
retry_manager: Arc::new(RetryManager::new()),
load_balancer: Arc::new(LoadBalancer::new(LoadBalancingStrategy::WeightedRoundRobin)),
health_monitor: Arc::new(HealthMonitor::new(HealthCheckConfig::default())),
warming_manager: Arc::new(WarmingManager::new(WarmingConfig::default())),
pool_timeout,
metrics: Arc::new(ParkingRwLock::new(NetworkMetrics::default())),
queue_metrics: Arc::new(ParkingRwLock::new(QueueMetrics::default())),
latency_metrics: Arc::new(ParkingRwLock::new(LatencyMetrics::default())),
throughput_metrics: Arc::new(ParkingRwLock::new(ThroughputMetrics::default())),
health_tracker: Arc::new(RwLock::new(ConnectionHealthTracker::default())),
circuit_breakers: Arc::new(DashMap::new()),
quality_scores: Arc::new(DashMap::new()),
maintenance_handle: None,
connection_limits,
monitoring_interval: Duration::from_secs(30),
}
}
pub async fn connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
if !circuit_breaker.allow_request() {
return Err(NetworkError::ConnectionError(
"Circuit breaker is open for this peer".into(),
));
}
}
if let Some(entry) = self.connection_pool.get(&peer_id) {
let (conn_info, last_used) = entry.value();
if last_used.elapsed() < self.pool_timeout && conn_info.is_healthy() {
self.connections.insert(peer_id, conn_info.clone());
debug!("Reusing pooled healthy connection for peer {:?}", peer_id);
if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
circuit_breaker.record_result(true);
}
return Ok(());
} else {
self.connection_pool.remove(&peer_id);
debug!(
"Removing expired/unhealthy connection for peer {:?}",
peer_id
);
}
}
if self.connections.len() >= self.max_connections {
warn!("Max connections reached");
return Err(NetworkError::ConnectionError(
"Max connections reached".into(),
));
}
let connecting_info = ConnectionInfo::new(ConnectionStatus::Connecting);
self.connections.insert(peer_id, connecting_info);
debug!("Creating new connection for peer {:?}", peer_id);
let start_time = Instant::now();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let connection_time = start_time.elapsed();
let success = rand::random::<f64>() > 0.1;
if success {
let mut connected_info = ConnectionInfo::new(ConnectionStatus::Connected);
connected_info.update_activity(true, connection_time, 0);
self.connections.insert(peer_id, connected_info.clone());
self.quality_scores
.insert(peer_id, connected_info.quality_score);
self.circuit_breakers
.entry(peer_id)
.or_insert_with(CircuitBreaker::default)
.record_result(true);
debug!(
"Successfully connected to peer {:?} in {:?}",
peer_id, connection_time
);
} else {
let failed_info =
ConnectionInfo::new(ConnectionStatus::Failed("Connection timeout".into()));
self.connections.insert(peer_id, failed_info);
self.circuit_breakers
.entry(peer_id)
.or_insert_with(CircuitBreaker::default)
.record_result(false);
return Err(NetworkError::ConnectionError(
"Failed to establish connection".into(),
));
}
Ok(())
}
pub fn update_status(&self, peer_id: PeerId, status: ConnectionStatus) {
self.update_status_with_metrics(peer_id, status, None, 0);
}
pub fn update_status_with_metrics(
&self,
peer_id: PeerId,
status: ConnectionStatus,
response_time: Option<Duration>,
bytes_transferred: u64,
) {
if let Some(mut conn_info) = self.connections.get_mut(&peer_id) {
conn_info.status = status.clone();
if let Some(rt) = response_time {
let success = matches!(status, ConnectionStatus::Connected);
conn_info.update_activity(success, rt, bytes_transferred);
self.quality_scores.insert(peer_id, conn_info.quality_score);
if let Some(mut circuit_breaker) = self.circuit_breakers.get_mut(&peer_id) {
circuit_breaker.record_result(success);
}
}
} else {
let mut conn_info = ConnectionInfo::new(status);
if let Some(rt) = response_time {
let success = matches!(conn_info.status, ConnectionStatus::Connected);
conn_info.update_activity(success, rt, bytes_transferred);
}
self.connections.insert(peer_id, conn_info);
}
let mut metrics = self.metrics.write();
metrics.connections = self.connections.len();
let active_count = self
.connections
.iter()
.filter(|entry| entry.value().is_healthy())
.count();
metrics.active_connections = active_count;
}
pub fn disconnect(&self, peer_id: &PeerId) {
if let Some((_, conn_info)) = self.connections.remove(peer_id) {
debug!(
"Disconnected from peer {:?} with status {:?} (quality: {:.2})",
peer_id, conn_info.status, conn_info.quality_score
);
if conn_info.is_healthy() {
self.connection_pool
.insert(*peer_id, (conn_info, Instant::now()));
}
}
self.quality_scores.remove(peer_id);
if let Some(circuit_breaker) = self.circuit_breakers.get_mut(peer_id) {
if circuit_breaker.state == CircuitBreakerState::Closed {
}
}
self.cleanup_pool();
let mut metrics = self.metrics.write();
metrics.connections = self.connections.len();
let active_count = self
.connections
.iter()
.filter(|entry| entry.value().is_healthy())
.count();
metrics.active_connections = active_count;
}
fn cleanup_pool(&self) {
self.connection_pool
.retain(|_, (_, last_used)| last_used.elapsed() < self.pool_timeout);
}
pub fn connection_count(&self) -> usize {
self.connections.len()
}
pub fn get_status(&self, peer_id: &PeerId) -> Option<ConnectionStatus> {
self.connections
.get(peer_id)
.map(|entry| entry.value().status.clone())
}
pub fn get_connection_info(&self, peer_id: &PeerId) -> Option<ConnectionInfo> {
self.connections
.get(peer_id)
.map(|entry| entry.value().clone())
}
pub fn get_quality_score(&self, peer_id: &PeerId) -> Option<f64> {
self.quality_scores.get(peer_id).map(|entry| *entry.value())
}
pub fn get_circuit_breaker_state(&self, peer_id: &PeerId) -> Option<CircuitBreakerState> {
self.circuit_breakers
.get(peer_id)
.map(|entry| entry.value().state.clone())
}
pub fn get_healthy_connections(&self) -> Vec<(PeerId, f64)> {
let mut healthy_peers = Vec::new();
for entry in self.connections.iter() {
let peer_id = *entry.key();
let conn_info = entry.value();
if conn_info.is_healthy() {
healthy_peers.push((peer_id, conn_info.quality_score));
}
}
healthy_peers.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
healthy_peers
}
pub fn update_metrics(&self, messages_per_second: f64, avg_latency_ms: u64) {
let latency_duration = std::time::Duration::from_millis(avg_latency_ms);
{
let mut metrics = self.metrics.write();
metrics.messages_per_second = messages_per_second;
metrics.avg_latency = latency_duration;
metrics.active_connections = self.connections.len();
}
{
let mut queue_metrics = self.queue_metrics.write();
queue_metrics.current_size = self.connection_pool.len();
queue_metrics.max_size = self.max_connections;
queue_metrics.utilization =
queue_metrics.current_size as f64 / queue_metrics.max_size as f64;
queue_metrics.messages_per_second = messages_per_second;
}
{
let mut latency_metrics = self.latency_metrics.write();
latency_metrics.avg_latency = latency_duration;
latency_metrics.peak_latency = latency_metrics.peak_latency.max(latency_duration);
}
{
let mut throughput_metrics = self.throughput_metrics.write();
throughput_metrics.messages_per_second = messages_per_second;
throughput_metrics.total_messages += 1;
throughput_metrics.avg_throughput =
(throughput_metrics.avg_throughput + messages_per_second) / 2.0;
throughput_metrics.peak_throughput =
throughput_metrics.peak_throughput.max(messages_per_second);
}
debug!(
"Updated network metrics: {} msg/s, {} ms latency",
messages_per_second, avg_latency_ms
);
}
pub fn get_queue_metrics(&self) -> QueueMetrics {
self.queue_metrics.read().clone()
}
pub fn get_latency_metrics(&self) -> LatencyMetrics {
self.latency_metrics.read().clone()
}
pub fn get_throughput_metrics(&self) -> ThroughputMetrics {
self.throughput_metrics.read().clone()
}
pub fn get_metrics(&self) -> NetworkMetrics {
self.metrics.read().clone()
}
pub async fn open_stream(
&self,
peer_id: PeerId,
priority: Priority,
) -> Result<StreamId, NetworkError> {
if !self.connections.contains_key(&peer_id) {
self.connect(peer_id).await?;
}
self.multiplexer.open_stream(peer_id, priority).await
}
pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
self.multiplexer.close_stream(stream_id).await
}
pub async fn send_stream_data(
&self,
stream_id: StreamId,
data: Bytes,
) -> Result<(), NetworkError> {
let stream_info = self
.multiplexer
.get_stream_info(stream_id)
.ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?;
if stream_info.state != StreamState::Active {
return Err(NetworkError::ConnectionError("Stream not active".into()));
}
info!("Sending {} bytes on stream {:?}", data.len(), stream_id);
Ok(())
}
pub async fn retry_connect(&self, peer_id: PeerId) -> Result<(), NetworkError> {
let retry_manager = self.retry_manager.clone();
retry_manager
.retry_operation(peer_id, || async { self.connect(peer_id).await })
.await
}
pub async fn select_best_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
self.load_balancer.select_connection(available_peers).await
}
pub async fn start_health_monitoring(&self, peer_id: PeerId) {
self.health_monitor.start_monitoring(peer_id).await;
}
pub async fn check_connection_health(&self, peer_id: PeerId) -> Option<HealthCheckResult> {
if let Some(connection_info) = self.get_connection_info(&peer_id) {
Some(
self.health_monitor
.check_health(peer_id, &connection_info)
.await,
)
} else {
None
}
}
pub async fn warm_connections(&self, peer_id: PeerId) -> Result<(), NetworkError> {
self.warming_manager.warm_connection(peer_id).await
}
pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
self.warming_manager.get_warming_state(peer_id)
}
pub async fn get_connection_statistics(&self) -> ConnectionStatistics {
let health_stats = self.health_monitor.stats.read().await.clone();
let retry_stats = self.retry_manager.stats.read().await.clone();
let warming_stats = self.warming_manager.stats.read().await.clone();
let load_balancing_stats = self.load_balancer.stats.read().await.clone();
ConnectionStatistics {
total_connections: self.connections.len(),
active_connections: self
.connections
.iter()
.filter(|entry| entry.value().is_healthy())
.count(),
pooled_connections: self.enhanced_pool.len(),
health_stats,
retry_stats,
warming_stats,
load_balancing_stats,
}
}
pub fn set_connection_limits(&mut self, limits: ConnectionLimits) {
self.max_connections = limits.max_total;
self.connection_limits = limits;
}
pub fn get_connection_limits(&self) -> &ConnectionLimits {
&self.connection_limits
}
}
#[derive(Debug, Clone)]
pub struct ConnectionStatistics {
pub total_connections: usize,
pub active_connections: usize,
pub pooled_connections: usize,
pub health_stats: HealthStats,
pub retry_stats: RetryStats,
pub warming_stats: WarmingStats,
pub load_balancing_stats: LoadBalancingStats,
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};
use std::time::Instant;
use tokio::time::Duration;
fn setup_test_config() -> SecureConfig {
SecureConfig {
transport_keys: TransportKeys::generate(),
timeout: std::time::Duration::from_secs(5),
keepalive: std::time::Duration::from_secs(10),
}
}
#[tokio::test]
async fn test_secure_connection() {
let test_config = setup_test_config();
let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
let server_config = ServerConfig::default();
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
.unwrap()
.0;
let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
.await
.expect("Failed to create secure connection");
let test_data = Bytes::from(b"test message".to_vec());
connection
.send(test_data)
.await
.expect("Failed to send message");
}
#[tokio::test]
async fn test_connection_management() {
let manager = ConnectionManager::new(2);
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
assert!(manager.connect(peer1).await.is_ok());
assert!(manager.connect(peer2).await.is_ok());
assert!(manager.connect(peer3).await.is_ok());
assert_eq!(manager.connection_count(), 2);
manager.update_status(peer1, ConnectionStatus::Connected);
assert_eq!(
manager.get_status(&peer1),
Some(ConnectionStatus::Connected)
);
manager.disconnect(&peer1);
assert_eq!(manager.get_status(&peer1), None);
assert_eq!(manager.connection_count(), 1);
manager.update_metrics(1000.0, 50);
let metrics = manager.get_metrics();
assert_eq!(metrics.messages_per_second, 1000.0);
assert_eq!(metrics.connections, 1);
}
#[tokio::test]
async fn bench_route_computation() {
let manager = ConnectionManager::new(100);
let _rng = rand::thread_rng();
let mut latencies = Vec::new();
for _ in 0..1000 {
let peer = PeerId::random();
let start = Instant::now();
manager.connect(peer).await.unwrap();
latencies.push(start.elapsed());
}
let avg_latency = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let max_latency = latencies.iter().max().unwrap();
println!("Route Computation Benchmark:");
println!("Average latency: {:?}", avg_latency);
println!("Maximum latency: {:?}", max_latency);
println!("Total routes: {}", manager.connection_count());
}
#[tokio::test]
async fn bench_cache_efficiency() {
let manager = ConnectionManager::new(1000);
let mut hit_count = 0;
let iterations = 10000;
for _ in 0..iterations {
let peer = PeerId::random();
let _start = Instant::now();
if let Some(_) = manager.connection_pool.get(&peer) {
hit_count += 1;
} else {
manager.connect(peer).await.unwrap();
}
}
let hit_rate = (hit_count as f64 / iterations as f64) * 100.0;
println!("Cache Efficiency Benchmark:");
println!("Cache hit rate: {:.2}%", hit_rate);
println!("Pool size: {}", manager.connection_pool.len());
}
#[tokio::test]
async fn bench_circuit_setup() {
let test_config = setup_test_config();
let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
let server_config = ServerConfig::default();
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
.unwrap()
.0;
let mut setup_times = Vec::new();
for _ in 0..100 {
let start = Instant::now();
let _connection =
SecureConnection::new(&endpoint, test_addr, test_config.clone()).await;
setup_times.push(start.elapsed());
}
let avg_setup = setup_times.iter().sum::<Duration>() / setup_times.len() as u32;
println!("Circuit Setup Benchmark:");
println!("Average setup time: {:?}", avg_setup);
}
#[tokio::test]
async fn bench_connection_pooling() {
let manager = ConnectionManager::with_pool_timeout(1000, Duration::from_secs(60));
let test_peers: Vec<PeerId> = (0..100).map(|_| PeerId::random()).collect();
let mut reuse_times = Vec::new();
for peer in test_peers.iter() {
manager.connect(*peer).await.unwrap();
}
for peer in test_peers.iter() {
let start = Instant::now();
manager.connect(*peer).await.unwrap();
reuse_times.push(start.elapsed());
}
let avg_reuse = reuse_times.iter().sum::<Duration>() / reuse_times.len() as u32;
println!("Connection Pooling Benchmark:");
println!("Average reuse time: {:?}", avg_reuse);
println!(
"Pool utilization: {:.2}%",
(manager.get_queue_metrics().utilization * 100.0)
);
}
#[tokio::test]
async fn bench_message_throughput() {
let test_config = setup_test_config();
let test_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
let server_config = ServerConfig::default();
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap())
.unwrap()
.0;
let mut connection = SecureConnection::new(&endpoint, test_addr, test_config)
.await
.unwrap();
let start = Instant::now();
let message_count = 10000;
let message_size = 1024;
for _ in 0..message_count {
let data = Bytes::from(vec![1u8; message_size]);
connection.send(data).await.unwrap();
}
let elapsed = start.elapsed();
let throughput = message_count as f64 / elapsed.as_secs_f64();
let mb_per_sec = (throughput * message_size as f64) / (1024.0 * 1024.0);
println!("Message Throughput Benchmark:");
println!("Messages per second: {:.2}", throughput);
println!("Throughput: {:.2} MB/s", mb_per_sec);
println!("Total time: {:?}", elapsed);
}
}
impl ConnectionMultiplexer {
pub fn new(max_streams_per_connection: u32, stream_timeout: Duration) -> Self {
Self {
connections: Arc::new(DashMap::new()),
stream_routes: Arc::new(DashMap::new()),
priority_queue: Arc::new(TokioRwLock::new(BTreeMap::new())),
max_streams_per_connection,
stream_timeout,
}
}
pub async fn open_stream(
&self,
peer_id: PeerId,
priority: Priority,
) -> Result<StreamId, NetworkError> {
let mut connection = self
.connections
.get_mut(&peer_id)
.ok_or_else(|| NetworkError::ConnectionError("Connection not found".into()))?;
if connection.streams.len() >= self.max_streams_per_connection as usize {
return Err(NetworkError::ConnectionError(
"Maximum streams reached".into(),
));
}
let _ = connection
.stream_semaphore
.acquire()
.await
.map_err(|_| NetworkError::ConnectionError("Stream semaphore closed".into()))?;
let stream_id = StreamId(connection.next_stream_id);
connection.next_stream_id += 1;
let stream_info = StreamInfo {
id: stream_id,
priority,
state: StreamState::Opening,
created_at: Instant::now(),
last_activity: Instant::now(),
bytes_transferred: 0,
};
connection.streams.insert(stream_id, stream_info);
self.stream_routes.insert(stream_id, peer_id);
let mut queue = self.priority_queue.write().await;
queue
.entry(priority)
.or_insert_with(VecDeque::new)
.push_back(stream_id);
Ok(stream_id)
}
pub async fn close_stream(&self, stream_id: StreamId) -> Result<(), NetworkError> {
let peer_id = self
.stream_routes
.remove(&stream_id)
.ok_or_else(|| NetworkError::ConnectionError("Stream not found".into()))?
.1;
if let Some(mut connection) = self.connections.get_mut(&peer_id) {
if let Some(stream) = connection.streams.get_mut(&stream_id) {
stream.state = StreamState::Closed;
stream.last_activity = Instant::now();
}
connection.streams.remove(&stream_id);
connection.utilization =
connection.streams.len() as f64 / self.max_streams_per_connection as f64;
}
Ok(())
}
pub fn get_stream_info(&self, stream_id: StreamId) -> Option<StreamInfo> {
let peer_id = self.stream_routes.get(&stream_id)?.value().clone();
let connection = self.connections.get(&peer_id)?;
connection.streams.get(&stream_id).cloned()
}
}
impl RetryManager {
pub fn new() -> Self {
Self {
retry_configs: Arc::new(DashMap::new()),
default_config: RetryConfig::default(),
stats: Arc::new(TokioRwLock::new(RetryStats::default())),
}
}
pub async fn retry_operation<F, Fut, T, E>(&self, peer_id: PeerId, operation: F) -> Result<T, E>
where
F: Fn() -> Fut + Send + Sync,
Fut: Future<Output = Result<T, E>> + Send,
E: std::fmt::Debug,
{
let config = self
.retry_configs
.get(&peer_id)
.map(|entry| entry.value().clone())
.unwrap_or_else(|| self.default_config.clone());
let mut attempt = 0;
let mut backoff = config.initial_backoff;
loop {
let start = Instant::now();
let result = operation().await;
let _duration = start.elapsed();
match result {
Ok(value) => {
let mut stats = self.stats.write().await;
stats.total_attempts += 1;
stats.successful_retries += 1;
return Ok(value);
}
Err(error) => {
attempt += 1;
if attempt >= config.max_retries {
let mut stats = self.stats.write().await;
stats.total_attempts += 1;
stats.failed_retries += 1;
return Err(error);
}
let jitter = (rand::random::<f64>() - 0.5) * 2.0 * config.jitter_factor;
let backoff_with_jitter = Duration::from_millis(
((backoff.as_millis() as f64) * (1.0 + jitter)) as u64,
);
sleep(backoff_with_jitter).await;
backoff = std::cmp::min(
Duration::from_millis(
(backoff.as_millis() as f64 * config.backoff_multiplier) as u64,
),
config.max_backoff,
);
}
}
}
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(30),
backoff_multiplier: 2.0,
jitter_factor: 0.1,
timeout: Duration::from_secs(10),
}
}
}
impl LoadBalancer {
pub fn new(strategy: LoadBalancingStrategy) -> Self {
Self {
strategy,
weights: Arc::new(DashMap::new()),
round_robin_counter: AtomicU64::new(0),
stats: Arc::new(TokioRwLock::new(LoadBalancingStats::default())),
}
}
pub async fn select_connection(&self, available_peers: &[PeerId]) -> Option<PeerId> {
if available_peers.is_empty() {
return None;
}
let selected = match self.strategy {
LoadBalancingStrategy::RoundRobin => {
let index = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) as usize;
available_peers[index % available_peers.len()]
}
LoadBalancingStrategy::WeightedRoundRobin => {
self.select_weighted_round_robin(available_peers).await
}
LoadBalancingStrategy::LeastConnections => {
self.select_least_connections(available_peers).await
}
LoadBalancingStrategy::ResponseTime => {
self.select_best_response_time(available_peers).await
}
LoadBalancingStrategy::ResourceUtilization => {
self.select_least_utilized(available_peers).await
}
};
let mut stats = self.stats.write().await;
stats.total_requests += 1;
*stats.peer_distribution.entry(selected).or_insert(0) += 1;
Some(selected)
}
async fn select_weighted_round_robin(&self, peers: &[PeerId]) -> PeerId {
let mut total_weight = 0.0;
let mut weighted_peers = Vec::new();
for &peer_id in peers {
let weight = self
.weights
.get(&peer_id)
.map(|entry| *entry.value())
.unwrap_or(1.0);
total_weight += weight;
weighted_peers.push((peer_id, weight));
}
let mut target = rand::random::<f64>() * total_weight;
for (peer_id, weight) in weighted_peers {
target -= weight;
if target <= 0.0 {
return peer_id;
}
}
peers[0] }
async fn select_least_connections(&self, peers: &[PeerId]) -> PeerId {
peers[0]
}
async fn select_best_response_time(&self, peers: &[PeerId]) -> PeerId {
let stats = self.stats.read().await;
let mut best_peer = peers[0];
let mut best_time = Duration::from_secs(u64::MAX);
for &peer_id in peers {
if let Some(avg_time) = stats.avg_response_times.get(&peer_id) {
if *avg_time < best_time {
best_time = *avg_time;
best_peer = peer_id;
}
}
}
best_peer
}
async fn select_least_utilized(&self, peers: &[PeerId]) -> PeerId {
peers[0]
}
}
impl HealthMonitor {
pub fn new(config: HealthCheckConfig) -> Self {
Self {
config,
results: Arc::new(DashMap::new()),
scheduler: Arc::new(TokioRwLock::new(HealthCheckScheduler {
scheduled_checks: HashMap::new(),
check_intervals: HashMap::new(),
})),
stats: Arc::new(TokioRwLock::new(HealthStats::default())),
}
}
pub async fn start_monitoring(&self, peer_id: PeerId) {
let mut scheduler = self.scheduler.write().await;
scheduler
.scheduled_checks
.insert(peer_id, Instant::now() + self.config.interval);
scheduler
.check_intervals
.insert(peer_id, self.config.interval);
}
pub async fn check_health(
&self,
peer_id: PeerId,
connection: &ConnectionInfo,
) -> HealthCheckResult {
let checker = PingHealthCheck::default();
let result = checker.check(&peer_id, connection).await;
self.results.insert(peer_id, result.clone());
let mut stats = self.stats.write().await;
stats.total_checks += 1;
if result.success {
stats.successful_checks += 1;
} else {
stats.failed_checks += 1;
}
let total_time = stats.avg_response_time.as_millis() as f64 * stats.total_checks as f64;
let new_avg = (total_time + result.response_time.as_millis() as f64)
/ (stats.total_checks + 1) as f64;
stats.avg_response_time = Duration::from_millis(new_avg as u64);
result
}
pub fn get_health_result(&self, peer_id: &PeerId) -> Option<HealthCheckResult> {
self.results.get(peer_id).map(|entry| entry.value().clone())
}
}
impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(30),
timeout: Duration::from_secs(5),
failure_threshold: 3,
recovery_threshold: 2,
check_type: HealthCheckType::Ping,
}
}
}
impl WarmingManager {
pub fn new(config: WarmingConfig) -> Self {
Self {
config,
warming_states: Arc::new(DashMap::new()),
warming_tasks: Arc::new(DashMap::new()),
stats: Arc::new(TokioRwLock::new(WarmingStats::default())),
}
}
pub async fn warm_connection(&self, peer_id: PeerId) -> Result<(), NetworkError> {
if !self.config.enabled {
return Ok(());
}
self.warming_states.insert(peer_id, WarmingState::Warming);
let start = Instant::now();
sleep(Duration::from_millis(100)).await; let warming_time = start.elapsed();
let mut stats = self.stats.write().await;
stats.total_attempts += 1;
if rand::random::<f64>() > 0.1 {
self.warming_states.insert(peer_id, WarmingState::Warm);
stats.successful_warmings += 1;
let total_time =
stats.avg_warming_time.as_millis() as f64 * stats.successful_warmings as f64;
let new_avg = (total_time + warming_time.as_millis() as f64)
/ (stats.successful_warmings + 1) as f64;
stats.avg_warming_time = Duration::from_millis(new_avg as u64);
Ok(())
} else {
self.warming_states.insert(
peer_id,
WarmingState::FailedToWarm("Warming timeout".to_string()),
);
stats.failed_warmings += 1;
Err(NetworkError::ConnectionError(
"Connection warming failed".into(),
))
}
}
pub fn get_warming_state(&self, peer_id: &PeerId) -> WarmingState {
self.warming_states
.get(peer_id)
.map(|entry| entry.value().clone())
.unwrap_or(WarmingState::Cold)
}
}
impl Default for WarmingConfig {
fn default() -> Self {
Self {
enabled: true,
min_pool_size: 5,
warming_timeout: Duration::from_secs(10),
warming_retries: 3,
predictive_threshold: 0.8,
}
}
}