use qudag_network::{
ConnectionManager, ConnectionStatus, HopInfo, MessageEnvelope, MessagePriority, MessageQueue,
NetworkError, NetworkMessage, PeerId, Router, RoutingStrategy,
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::time::timeout;
#[tokio::test]
async fn test_anonymous_routing_properties() {
let router = Router::new();
let peers: Vec<_> = (0..10).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let source_peer = peers[0];
let dest_peer = peers[9];
let msg = NetworkMessage {
id: "anonymous_test".into(),
source: source_peer.to_bytes().to_vec(),
destination: dest_peer.to_bytes().to_vec(),
payload: vec![0; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let mut routes = Vec::new();
for _ in 0..10 {
let route = router
.route(&msg, RoutingStrategy::Anonymous { hops: 5 })
.await
.unwrap();
routes.push(route);
}
for route in &routes {
assert!(!route.contains(&source_peer), "Route contains source peer");
assert!(
!route.contains(&dest_peer),
"Route contains destination peer"
);
assert_eq!(route.len(), 5, "Route has wrong number of hops");
let unique_peers: HashSet<_> = route.iter().collect();
assert_eq!(
unique_peers.len(),
route.len(),
"Route contains duplicate peers"
);
}
let unique_routes: HashSet<_> = routes.iter().collect();
assert!(
unique_routes.len() > 1,
"All routes are identical - not sufficiently random"
);
}
#[tokio::test]
async fn test_hop_isolation() {
let router = Router::new();
let peers: Vec<_> = (0..6).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let msg = NetworkMessage {
id: "isolation_test".into(),
source: peers[0].to_bytes().to_vec(),
destination: peers[5].to_bytes().to_vec(),
payload: vec![0; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let route = router
.route(&msg, RoutingStrategy::Anonymous { hops: 4 })
.await
.unwrap();
for (i, peer_id) in route.iter().enumerate() {
let hop_info = router.get_hop_info(peer_id).await;
assert!(
hop_info.is_ok(),
"Missing hop info for peer at position {}",
i
);
let info = hop_info.unwrap();
assert!(
info.can_decrypt_layer(i),
"Hop {} cannot decrypt its own layer",
i
);
for j in 0..route.len() {
if j != i {
assert!(
!info.can_decrypt_layer(j),
"Hop {} can decrypt layer {} (should only decrypt layer {})",
i,
j,
i
);
}
}
let mut expected_known_peers = HashSet::new();
if i > 0 {
expected_known_peers.insert(route[i - 1]);
}
if i < route.len() - 1 {
expected_known_peers.insert(route[i + 1]);
}
for peer in &route {
if expected_known_peers.contains(peer) {
assert!(
info.knows_peer(peer),
"Hop {} should know about adjacent peer",
i
);
} else if *peer != *peer_id {
assert!(
!info.knows_peer(peer),
"Hop {} should not know about non-adjacent peer",
i
);
}
}
}
}
#[tokio::test]
async fn test_traffic_analysis_resistance() {
let router = Router::new();
let peers: Vec<_> = (0..20).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let source_peer = peers[0];
let dest_peer = peers[19];
let mut routes = Vec::new();
for i in 0..50 {
let msg = NetworkMessage {
id: format!("traffic_test_{}", i),
source: source_peer.to_bytes().to_vec(),
destination: dest_peer.to_bytes().to_vec(),
payload: vec![0; 100 + (i % 500)], priority: if i % 3 == 0 {
MessagePriority::High
} else {
MessagePriority::Normal
},
ttl: Duration::from_secs(60 + (i % 300) as u64), };
let route = router
.route(&msg, RoutingStrategy::Anonymous { hops: 3 })
.await
.unwrap();
routes.push(route);
}
let mut hop_frequency = std::collections::HashMap::new();
let mut route_patterns = HashSet::new();
for route in &routes {
for peer in route {
*hop_frequency.entry(*peer).or_insert(0) += 1;
}
route_patterns.insert(route.clone());
}
let max_frequency = hop_frequency.values().max().unwrap_or(&0);
let total_hops = routes.len() * 3; let max_allowed_frequency = total_hops / 3;
assert!(
*max_frequency <= max_allowed_frequency,
"Peer appears too frequently in routes: {} out of {} total hops",
max_frequency,
total_hops
);
let diversity_ratio = route_patterns.len() as f64 / routes.len() as f64;
assert!(
diversity_ratio >= 0.7,
"Route diversity too low: {:.2}% unique routes",
diversity_ratio * 100.0
);
}
#[tokio::test]
async fn test_timing_attack_resistance() {
let router = Router::new();
let peers: Vec<_> = (0..8).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let msg = NetworkMessage {
id: "timing_test".into(),
source: peers[0].to_bytes().to_vec(),
destination: peers[7].to_bytes().to_vec(),
payload: vec![0; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let mut routing_times = Vec::new();
for _ in 0..100 {
let start = std::time::Instant::now();
let _route = router
.route(&msg, RoutingStrategy::Anonymous { hops: 3 })
.await
.unwrap();
let elapsed = start.elapsed();
routing_times.push(elapsed);
}
let avg_time = routing_times.iter().sum::<Duration>() / routing_times.len() as u32;
let max_time = routing_times.iter().max().unwrap();
let min_time = routing_times.iter().min().unwrap();
let variance = routing_times
.iter()
.map(|t| {
let diff = if *t > avg_time {
*t - avg_time
} else {
avg_time - *t
};
diff.as_nanos() as f64
})
.map(|diff| diff * diff)
.sum::<f64>()
/ routing_times.len() as f64;
let std_dev = variance.sqrt();
let cv = std_dev / avg_time.as_nanos() as f64;
println!(
"Timing analysis - Avg: {:?}, Min: {:?}, Max: {:?}, CV: {:.3}",
avg_time, min_time, max_time, cv
);
assert!(cv < 0.5, "Timing variance too high: {:.3}", cv);
let max_ratio = max_time.as_nanos() as f64 / min_time.as_nanos() as f64;
assert!(
max_ratio < 10.0,
"Extreme timing difference detected: {:.2}x",
max_ratio
);
}
#[tokio::test]
async fn test_connection_metadata_protection() {
let manager = ConnectionManager::new(20);
let mut peer_connections = Vec::new();
for i in 0..10 {
let peer_id = PeerId::random();
peer_connections.push(peer_id);
manager.connect(peer_id).await.unwrap();
manager
.update_status(peer_id, ConnectionStatus::Connected)
.await;
tokio::time::sleep(Duration::from_millis(i as u64 * 10)).await;
}
for i in 0..100 {
let msg_rate = 100.0 + (i as f64 * 10.0) + (i as f64).sin() * 50.0;
let latency = 20 + (i % 50) as u64;
manager.update_metrics(msg_rate, latency).await;
if i % 10 == 0 {
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
let metrics = manager.get_metrics().await;
let queue_metrics = manager.get_queue_metrics();
let latency_metrics = manager.get_latency_metrics();
let throughput_metrics = manager.get_throughput_metrics();
assert!(metrics.connections > 0);
assert!(metrics.messages_per_second > 0.0);
assert!(queue_metrics.utilization >= 0.0 && queue_metrics.utilization <= 1.0);
assert!(latency_metrics.avg_latency > Duration::ZERO);
assert!(throughput_metrics.messages_per_second > 0.0);
}
#[tokio::test]
async fn test_statistical_analysis_resistance() {
let router = Router::new();
let peers: Vec<_> = (0..15).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let mut all_routes = Vec::new();
for source_idx in 0..5 {
for dest_idx in 10..15 {
for _ in 0..20 {
let msg = NetworkMessage {
id: format!("stat_test_{}_{}", source_idx, dest_idx),
source: peers[source_idx].to_bytes().to_vec(),
destination: peers[dest_idx].to_bytes().to_vec(),
payload: vec![0; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let route = router
.route(&msg, RoutingStrategy::Anonymous { hops: 4 })
.await
.unwrap();
all_routes.push((source_idx, dest_idx, route));
}
}
}
let mut source_route_correlation = std::collections::HashMap::new();
let mut dest_route_correlation = std::collections::HashMap::new();
for (source_idx, dest_idx, route) in &all_routes {
let route_pattern = route.iter().map(|p| p.to_bytes()[0]).collect::<Vec<_>>();
source_route_correlation
.entry(*source_idx)
.or_insert_with(Vec::new)
.push(route_pattern.clone());
dest_route_correlation
.entry(*dest_idx)
.or_insert_with(Vec::new)
.push(route_pattern);
}
for (source_idx, routes) in &source_route_correlation {
let unique_routes: HashSet<_> = routes.iter().collect();
let diversity = unique_routes.len() as f64 / routes.len() as f64;
assert!(
diversity >= 0.5,
"Routes for source {} are not diverse enough: {:.2}%",
source_idx,
diversity * 100.0
);
}
for (dest_idx, routes) in &dest_route_correlation {
let unique_routes: HashSet<_> = routes.iter().collect();
let diversity = unique_routes.len() as f64 / routes.len() as f64;
assert!(
diversity >= 0.5,
"Routes for destination {} are not diverse enough: {:.2}%",
dest_idx,
diversity * 100.0
);
}
}
#[tokio::test]
async fn test_forward_secrecy() {
let router = Router::new();
let peers: Vec<_> = (0..8).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let msg1 = NetworkMessage {
id: "forward_secrecy_test_1".into(),
source: peers[0].to_bytes().to_vec(),
destination: peers[7].to_bytes().to_vec(),
payload: vec![1; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let msg2 = NetworkMessage {
id: "forward_secrecy_test_2".into(),
source: peers[0].to_bytes().to_vec(),
destination: peers[7].to_bytes().to_vec(),
payload: vec![2; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let route1 = router
.route(&msg1, RoutingStrategy::Anonymous { hops: 3 })
.await
.unwrap();
let mut hop_info_1 = Vec::new();
for peer_id in &route1 {
let info = router.get_hop_info(peer_id).await.unwrap();
hop_info_1.push(info);
}
let route2 = router
.route(&msg2, RoutingStrategy::Anonymous { hops: 3 })
.await
.unwrap();
let mut hop_info_2 = Vec::new();
for peer_id in &route2 {
let info = router.get_hop_info(peer_id).await.unwrap();
hop_info_2.push(info);
}
for peer_id in &route1 {
if route2.contains(peer_id) {
let old_info = hop_info_1.iter().find(|info| info.peer_id == *peer_id);
let new_info = hop_info_2.iter().find(|info| info.peer_id == *peer_id);
if let (Some(old), Some(new)) = (old_info, new_info) {
let old_knows = old.known_peers.len();
let new_knows = new.known_peers.len();
assert!(old_knows <= 2, "Hop knows too many peers in route 1");
assert!(new_knows <= 2, "Hop knows too many peers in route 2");
}
}
}
}
#[tokio::test]
async fn test_message_unlinkability() {
let router = Router::new();
let peers: Vec<_> = (0..12).map(|_| PeerId::random()).collect();
for peer in &peers {
router.add_peer(*peer).await;
}
let source_peer = peers[0];
let destinations = vec![peers[8], peers[9], peers[10], peers[11]];
let mut routes_by_dest = std::collections::HashMap::new();
for dest_peer in &destinations {
let mut routes = Vec::new();
for i in 0..10 {
let msg = NetworkMessage {
id: format!("unlinkability_test_{}", i),
source: source_peer.to_bytes().to_vec(),
destination: dest_peer.to_bytes().to_vec(),
payload: vec![0; 100],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let route = router
.route(&msg, RoutingStrategy::Anonymous { hops: 3 })
.await
.unwrap();
routes.push(route);
}
routes_by_dest.insert(*dest_peer, routes);
}
let mut common_peers_counts = Vec::new();
for (dest1, routes1) in &routes_by_dest {
for (dest2, routes2) in &routes_by_dest {
if dest1 >= dest2 {
continue;
}
let mut peers1 = HashSet::new();
let mut peers2 = HashSet::new();
for route in routes1 {
for peer in route {
peers1.insert(*peer);
}
}
for route in routes2 {
for peer in route {
peers2.insert(*peer);
}
}
let common_peers = peers1.intersection(&peers2).count();
common_peers_counts.push(common_peers);
}
}
let avg_common_peers =
common_peers_counts.iter().sum::<usize>() as f64 / common_peers_counts.len() as f64;
let total_available_peers = peers.len() - 5; let overlap_ratio = avg_common_peers / total_available_peers as f64;
println!(
"Average common peers between destination routes: {:.2}",
avg_common_peers
);
println!("Overlap ratio: {:.2}%", overlap_ratio * 100.0);
assert!(
overlap_ratio < 0.7,
"Too much overlap between routes to different destinations: {:.2}%",
overlap_ratio * 100.0
);
}
#[tokio::test]
async fn test_nonce_reuse_vulnerability() {
let mut message_hashes = HashSet::new();
for i in 0..1000 {
let msg = NetworkMessage {
id: format!("nonce_test_{}", i),
source: vec![1, 2, 3, 4],
destination: vec![5, 6, 7, 8],
payload: b"identical payload for nonce test".to_vec(), priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let envelope = MessageEnvelope::new(msg);
if message_hashes.contains(&envelope.hash) {
panic!(
"CRITICAL: Duplicate message hash detected - nonce reuse vulnerability confirmed"
);
}
message_hashes.insert(envelope.hash);
}
println!(
"Nonce uniqueness test passed - {} unique hashes generated",
message_hashes.len()
);
}
#[tokio::test]
async fn test_message_authentication_bypass() {
let msg = NetworkMessage {
id: "auth_bypass_test".into(),
source: vec![1, 2, 3, 4],
destination: vec![5, 6, 7, 8],
payload: b"sensitive payload".to_vec(),
priority: MessagePriority::High,
ttl: Duration::from_secs(60),
};
let mut envelope = MessageEnvelope::new(msg.clone());
assert!(
envelope.verify(),
"Original message should verify correctly"
);
envelope.message.payload = b"tampered payload".to_vec();
if envelope.verify() {
panic!("CRITICAL: Message tampering not detected - authentication bypass vulnerability");
}
let result = envelope.verify_signature(b"fake_public_key");
match result {
Ok(true) => {
panic!("CRITICAL: Unsigned message accepted as valid - signature bypass vulnerability")
}
Ok(false) => println!("Correctly rejected unsigned message"),
Err(_) => println!("Error handling unsigned message verification"),
}
}
#[tokio::test]
async fn test_queue_dos_vulnerability() {
let (queue, _rx) = MessageQueue::new();
let mut messages_accepted = 0;
let max_test_messages = 200_000;
for i in 0..max_test_messages {
let msg = NetworkMessage {
id: format!("dos_test_{}", i),
source: vec![i as u8, (i >> 8) as u8, (i >> 16) as u8, (i >> 24) as u8],
destination: vec![255, 254, 253, 252],
payload: vec![0; 1024], priority: MessagePriority::Low,
ttl: Duration::from_secs(60),
};
match timeout(Duration::from_millis(1), queue.enqueue(msg)).await {
Ok(Ok(_)) => {
messages_accepted += 1;
if i % 10000 == 0 {
let queue_size = queue.len().await;
println!("Queue size after {} messages: {}", i, queue_size);
if queue_size > 150_000 {
panic!(
"CRITICAL: Queue grew unboundedly to {} - DoS vulnerability confirmed",
queue_size
);
}
}
}
Ok(Err(_)) => {
println!("Message rejected at iteration {} - protection working", i);
break;
}
Err(_) => {
println!("Timeout at iteration {} - potential DoS", i);
break;
}
}
}
println!(
"DoS test completed - {} messages accepted before protection kicked in",
messages_accepted
);
if messages_accepted > 150_000 {
panic!("CRITICAL: Too many messages accepted - DoS vulnerability detected");
}
}
#[tokio::test]
async fn test_connection_pool_dos() {
let manager = ConnectionManager::new(10); let mut successful_connections = 0;
for i in 0..100 {
let peer_id = PeerId::random();
match manager.connect(peer_id).await {
Ok(_) => {
successful_connections += 1;
println!("Connection {} accepted", i);
}
Err(_) => {
println!("Connection {} rejected", i);
break;
}
}
if successful_connections > 15 {
panic!(
"CRITICAL: Connection limit not enforced - {} connections accepted",
successful_connections
);
}
}
println!(
"Connection limit test passed - {} connections accepted",
successful_connections
);
}
#[tokio::test]
async fn test_replay_attack_vulnerability() {
let (queue, _rx) = MessageQueue::new();
let msg = NetworkMessage {
id: "replay_test".into(),
source: vec![1, 2, 3, 4],
destination: vec![5, 6, 7, 8],
payload: b"replay test payload".to_vec(),
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
assert!(
queue.enqueue(msg.clone()).await.is_ok(),
"First message should be accepted"
);
let replay_result = queue.enqueue(msg.clone()).await;
match replay_result {
Ok(_) => println!("WARNING: Replay attack not prevented - message accepted twice"),
Err(_) => println!("Replay attack correctly prevented"),
}
let old_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- 3600;
let mut old_msg = msg.clone();
old_msg.id = "old_message_test".into();
let mut envelope = MessageEnvelope::new(old_msg);
envelope.timestamp = old_timestamp;
let mut hasher = blake3::Hasher::new();
hasher.update(&bincode::serialize(&envelope.message).unwrap());
hasher.update(&envelope.timestamp.to_le_bytes());
envelope.hash = hasher.finalize();
if envelope.verify() {
println!("WARNING: Old message accepted - timestamp validation may be weak");
}
}
#[tokio::test]
async fn test_peer_identity_spoofing() {
let manager = ConnectionManager::new(50);
let legitimate_peer = PeerId::random();
manager.connect(legitimate_peer).await.unwrap();
let spoofed_peer = PeerId::from_bytes(legitimate_peer.to_bytes());
let spoof_result = manager.connect(spoofed_peer).await;
match spoof_result {
Ok(_) => println!("WARNING: Identity spoofing not prevented - same peer ID accepted twice"),
Err(_) => println!("Identity spoofing correctly prevented"),
}
let mut weak_ids = 0;
for _ in 0..1000 {
let peer = PeerId::random();
let bytes = peer.to_bytes();
if bytes.iter().all(|&b| b == 0)
|| bytes.iter().all(|&b| b == 255)
|| bytes.windows(4).any(|w| w == [0, 0, 0, 0])
{
weak_ids += 1;
}
}
if weak_ids > 0 {
panic!(
"CRITICAL: {} weak peer IDs generated out of 1000 - randomness vulnerability",
weak_ids
);
}
println!("Peer ID generation passed entropy test");
}
#[tokio::test]
async fn test_timing_information_leakage() {
let (queue, _rx) = MessageQueue::new();
let mut timing_data = Vec::new();
for size in [100, 1000, 10000, 100000] {
let mut times_for_size = Vec::new();
for i in 0..10 {
let msg = NetworkMessage {
id: format!("timing_test_{}_{}", size, i),
source: vec![1, 2, 3, 4],
destination: vec![5, 6, 7, 8],
payload: vec![0; size],
priority: MessagePriority::Normal,
ttl: Duration::from_secs(60),
};
let start = Instant::now();
let _ = queue.enqueue(msg).await;
let elapsed = start.elapsed();
times_for_size.push(elapsed.as_nanos());
}
let avg_time = times_for_size.iter().sum::<u128>() / times_for_size.len() as u128;
timing_data.push((size, avg_time));
}
println!("Timing analysis by payload size:");
for (size, avg_time) in &timing_data {
println!(" Size: {} bytes, Avg time: {} ns", size, avg_time);
}
let first_time = timing_data[0].1 as f64;
let last_time = timing_data.last().unwrap().1 as f64;
let time_ratio = last_time / first_time;
if time_ratio > 10.0 {
println!("WARNING: Significant timing variation detected - potential information leakage through timing analysis");
}
}
#[tokio::test]
async fn test_metadata_exposure() {
let msg = NetworkMessage {
id: "metadata_test".into(),
source: vec![1, 2, 3, 4],
destination: vec![5, 6, 7, 8],
payload: b"confidential data".to_vec(),
priority: MessagePriority::High,
ttl: Duration::from_secs(60),
};
let serialized = serde_json::to_string(&msg).unwrap();
let mut exposures = Vec::new();
if serialized.contains("confidential data") {
exposures.push("Payload data exposed in serialization");
}
if serialized.len() > 1000 {
exposures.push("Serialized message is very large - potential metadata leakage");
}
let envelope = MessageEnvelope::new(msg);
let timestamp_now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let timestamp_diff = if envelope.timestamp > timestamp_now {
envelope.timestamp - timestamp_now
} else {
timestamp_now - envelope.timestamp
};
if timestamp_diff < 1 {
exposures.push("Timestamp reveals precise message creation time");
}
for exposure in exposures {
println!("WARNING: {}", exposure);
}
}
#[tokio::test]
async fn test_weak_encryption_implementation() {
let mut keys = Vec::new();
for _ in 0..100 {
let peer = PeerId::random();
keys.push(peer.to_bytes());
}
let mut key_set = HashSet::new();
let mut duplicates = 0;
for key in &keys {
if !key_set.insert(key) {
duplicates += 1;
}
}
if duplicates > 0 {
panic!(
"CRITICAL: {} duplicate keys generated - weak randomness in key generation",
duplicates
);
}
let mut byte_frequencies = [0u32; 256];
for key in &keys {
for &byte in key {
byte_frequencies[byte as usize] += 1;
}
}
let expected_freq = (keys.len() * 32) / 256;
let threshold = expected_freq / 3;
let mut biased_bytes = 0;
for (byte_val, &freq) in byte_frequencies.iter().enumerate() {
if freq < threshold || freq > expected_freq + threshold {
biased_bytes += 1;
if freq == 0 || freq > expected_freq * 2 {
println!(
"WARNING: Byte value {} has suspicious frequency: {}",
byte_val, freq
);
}
}
}
if biased_bytes > 50 {
println!(
"WARNING: {} byte values show significant bias - potential weak randomness",
biased_bytes
);
}
println!(
"Encryption entropy test completed - {} keys tested",
keys.len()
);
}
#[tokio::test]
async fn test_large_message_dos() {
let (queue, _rx) = MessageQueue::new();
for size_mb in 1..=20 {
let payload_size = size_mb * 1024 * 1024;
let msg = NetworkMessage {
id: format!("large_msg_test_{}", size_mb),
source: vec![1, 2, 3, 4],
destination: vec![5, 6, 7, 8],
payload: vec![0; payload_size],
priority: MessagePriority::Low,
ttl: Duration::from_secs(60),
};
match timeout(Duration::from_millis(100), queue.enqueue(msg)).await {
Ok(Ok(_)) => {
println!("Accepted {}MB message", size_mb);
if size_mb > 10 {
println!("WARNING: System accepting very large messages ({}MB) - potential DoS vulnerability", size_mb);
}
}
Ok(Err(_)) => {
println!("Rejected {}MB message - protection working", size_mb);
break;
}
Err(_) => {
println!(
"Timeout on {}MB message - system may be overwhelmed",
size_mb
);
break;
}
}
}
}
#[tokio::test]
async fn test_concurrent_attack_scenarios() {
let (queue, _rx) = MessageQueue::new();
let queue = Arc::new(queue);
let manager = Arc::new(ConnectionManager::new(50));
let mut handles = Vec::new();
for task_id in 0..5 {
let queue_clone = queue.clone();
let manager_clone = manager.clone();
let handle = tokio::spawn(async move {
let mut local_stats = (0, 0);
for i in 0..1000 {
if i % 2 == 0 {
let msg = NetworkMessage {
id: format!("concurrent_attack_{}_{}", task_id, i),
source: vec![task_id as u8],
destination: vec![255],
payload: vec![0; 1024],
priority: MessagePriority::Low,
ttl: Duration::from_secs(60),
};
if queue_clone.enqueue(msg).await.is_ok() {
local_stats.0 += 1;
}
} else {
let peer_id = PeerId::random();
if manager_clone.connect(peer_id).await.is_ok() {
local_stats.1 += 1;
}
}
if local_stats.0 > 500 || local_stats.1 > 25 {
break;
}
}
local_stats
});
handles.push(handle);
}
let mut total_messages = 0;
let mut total_connections = 0;
for handle in handles {
let (messages, connections) = handle.await.unwrap();
total_messages += messages;
total_connections += connections;
}
println!(
"Concurrent attack test: {} messages, {} connections accepted",
total_messages, total_connections
);
if total_messages > 10000 {
panic!("CRITICAL: Too many messages accepted under concurrent attack - DoS vulnerability");
}
if total_connections > 100 {
panic!(
"CRITICAL: Too many connections accepted under concurrent attack - DoS vulnerability"
);
}
let final_queue_size = queue.len().await;
let final_connection_count = manager.connection_count().await;
println!(
"Final state: {} queued messages, {} active connections",
final_queue_size, final_connection_count
);
}