use qudag_network::connection::{
CircuitBreaker, CircuitBreakerState, ConnectionInfo, ConnectionManager,
HealthStatistics, SecureConfig, SecureConnection, TransportKeys, UnhealthyConnectionInfo,
};
use qudag_network::types::{ConnectionStatus, NetworkError, PeerId};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio::time::{sleep, timeout};
#[tokio::test]
async fn test_connection_manager_basic_operations() {
let manager = ConnectionManager::new(10);
let peer1 = PeerId::random();
let peer2 = PeerId::random();
assert!(manager.connect(peer1).await.is_ok());
assert!(manager.connect(peer2).await.is_ok());
assert_eq!(manager.connection_count(), 2);
for i in 0..10 {
let peer = PeerId::random();
let result = manager.connect(peer).await;
if i < 8 {
assert!(result.is_ok());
} else {
assert!(matches!(result, Err(NetworkError::ConnectionLimitReached)));
}
}
}
#[tokio::test]
async fn test_connection_status_updates() {
let manager = ConnectionManager::new(10);
let peer = PeerId::random();
manager.connect(peer).await.unwrap();
manager.update_status(peer, ConnectionStatus::Connected);
assert_eq!(manager.get_status(&peer), Some(ConnectionStatus::Connected));
manager.update_status(peer, ConnectionStatus::Disconnected);
assert_eq!(manager.get_status(&peer), Some(ConnectionStatus::Disconnected));
let unknown = PeerId::random();
assert_eq!(manager.get_status(&unknown), None);
}
#[tokio::test]
async fn test_connection_manager_disconnection() {
let manager = ConnectionManager::new(10);
let peer = PeerId::random();
manager.connect(peer).await.unwrap();
assert_eq!(manager.connection_count(), 1);
manager.disconnect(&peer);
assert_eq!(manager.connection_count(), 0);
assert_eq!(manager.get_status(&peer), None);
}
#[tokio::test]
async fn test_connection_metrics() {
let manager = ConnectionManager::new(10);
for _ in 0..5 {
manager.connect(PeerId::random()).await.unwrap();
}
manager.update_metrics(1000.0, 50);
let metrics = manager.get_metrics();
assert_eq!(metrics.active_connections, 5);
assert!(metrics.messages_per_second > 0.0);
assert!(metrics.bytes_per_second > 0.0);
}
#[tokio::test]
async fn test_health_check() {
let manager = ConnectionManager::new(10);
let peer1 = PeerId::random();
let peer2 = PeerId::random();
manager.connect(peer1).await.unwrap();
manager.connect(peer2).await.unwrap();
manager.update_status(peer1, ConnectionStatus::Connected);
manager.update_status(peer2, ConnectionStatus::Disconnected);
let health = manager.health_check();
assert_eq!(health.total_connections, 2);
assert_eq!(health.healthy_connections, 1);
assert_eq!(health.unhealthy_connections.len(), 1);
}
#[tokio::test]
async fn test_circuit_breaker_basic() {
let breaker = CircuitBreaker::new(3, Duration::from_secs(1), Duration::from_secs(5));
assert!(matches!(breaker.state(), CircuitBreakerState::Closed));
assert!(breaker.can_proceed());
for _ in 0..3 {
breaker.record_failure();
}
assert!(matches!(breaker.state(), CircuitBreakerState::Open));
assert!(!breaker.can_proceed());
}
#[tokio::test]
async fn test_circuit_breaker_recovery() {
let breaker = CircuitBreaker::new(2, Duration::from_millis(100), Duration::from_millis(200));
breaker.record_failure();
breaker.record_failure();
assert!(matches!(breaker.state(), CircuitBreakerState::Open));
sleep(Duration::from_millis(250)).await;
assert!(matches!(breaker.state(), CircuitBreakerState::HalfOpen));
assert!(breaker.can_proceed());
breaker.record_success();
assert!(matches!(breaker.state(), CircuitBreakerState::Closed));
}
#[tokio::test]
async fn test_circuit_breaker_half_open_failure() {
let breaker = CircuitBreaker::new(1, Duration::from_millis(50), Duration::from_millis(100));
breaker.record_failure();
assert!(matches!(breaker.state(), CircuitBreakerState::Open));
sleep(Duration::from_millis(150)).await;
assert!(matches!(breaker.state(), CircuitBreakerState::HalfOpen));
breaker.record_failure();
assert!(matches!(breaker.state(), CircuitBreakerState::Open));
}
#[tokio::test]
async fn test_secure_connection_creation() {
let config = SecureConfig {
enable_encryption: true,
enable_authentication: true,
timeout: Duration::from_secs(30),
max_retries: 3,
};
let peer = PeerId::random();
let conn = SecureConnection::new(peer, config.clone());
assert_eq!(conn.peer_id(), peer);
assert_eq!(conn.config().enable_encryption, true);
assert!(conn.is_secure());
}
#[tokio::test]
async fn test_transport_keys_generation() {
let keys1 = TransportKeys::generate();
let keys2 = TransportKeys::generate();
assert_ne!(keys1.public_key(), keys2.public_key());
assert_ne!(keys1.private_key(), keys2.private_key());
assert_eq!(keys1.public_key().len(), 32);
assert_eq!(keys1.private_key().len(), 32);
}
#[tokio::test]
async fn test_connection_info() {
let peer = PeerId::random();
let info = ConnectionInfo::new(peer, "127.0.0.1:8080".to_string());
assert_eq!(info.peer_id(), peer);
assert_eq!(info.address(), "127.0.0.1:8080");
assert!(info.connected_at().elapsed() < Duration::from_secs(1));
assert_eq!(info.bytes_sent(), 0);
assert_eq!(info.bytes_received(), 0);
info.update_sent(1000);
info.update_received(2000);
assert_eq!(info.bytes_sent(), 1000);
assert_eq!(info.bytes_received(), 2000);
}
#[tokio::test]
async fn test_concurrent_connection_management() {
let manager = Arc::new(ConnectionManager::new(100));
let mut handles = vec![];
for i in 0..50 {
let mgr = Arc::clone(&manager);
let handle = tokio::spawn(async move {
let peer = PeerId::from_bytes([i as u8; 32]);
mgr.connect(peer).await
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles).await;
let successful = results.iter().filter(|r| r.is_ok()).count();
assert_eq!(successful, 50);
assert_eq!(manager.connection_count(), 50);
}
#[tokio::test]
async fn test_connection_manager_stress() {
let manager = Arc::new(ConnectionManager::new(1000));
let mut handles = vec![];
for i in 0..100 {
let mgr = Arc::clone(&manager);
let handle = tokio::spawn(async move {
let peer = PeerId::from_bytes([i as u8; 32]);
mgr.connect(peer).await.unwrap();
for _ in 0..10 {
match i % 3 {
0 => mgr.update_status(peer, ConnectionStatus::Connected),
1 => mgr.update_metrics(100.0, 10),
_ => { let _ = mgr.get_status(&peer); }
}
tokio::time::sleep(Duration::from_micros(100)).await;
}
if i % 2 == 0 {
mgr.disconnect(&peer);
}
});
handles.push(handle);
}
timeout(Duration::from_secs(10), futures::future::join_all(handles))
.await
.expect("Stress test timed out");
let health = manager.health_check();
assert!(health.total_connections <= 100);
assert!(health.healthy_connections <= health.total_connections);
}
#[tokio::test]
async fn test_connection_manager_edge_cases() {
let manager = ConnectionManager::new(5);
let peer = PeerId::random();
assert!(manager.connect(peer).await.is_ok());
assert!(manager.connect(peer).await.is_ok()); assert_eq!(manager.connection_count(), 1);
manager.disconnect(&peer);
manager.disconnect(&peer); assert_eq!(manager.connection_count(), 0);
let unknown = PeerId::random();
assert_eq!(manager.get_status(&unknown), None);
manager.update_status(unknown, ConnectionStatus::Connected); manager.disconnect(&unknown); }