use crate::network::NetworkManager;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
#[tokio::test]
async fn test_concurrent_mutex_access() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let mut handles = vec![];
for i in 0..10 {
let manager_clone = Arc::clone(&manager);
let handle = tokio::spawn(async move {
let pm = manager_clone.peer_manager().await;
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(pm.peer_count(), 0);
drop(pm); i
});
handles.push(handle);
}
let results = futures::future::join_all(handles).await;
for result in results {
assert!(result.is_ok(), "Task should complete without deadlock");
}
}
#[tokio::test]
async fn test_no_lock_held_across_await() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let manager_clone = Arc::clone(&manager);
let handle = tokio::spawn(async move {
{
let _pm = manager_clone.peer_manager().await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
let _pm = manager_clone.peer_manager().await;
drop(_pm); });
let result = timeout(Duration::from_secs(1), handle).await;
assert!(result.is_ok(), "Should complete without deadlock");
}
#[tokio::test]
async fn test_lock_ordering() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let manager_clone = Arc::clone(&manager);
let handle = tokio::spawn(async move {
let _pm = manager_clone.peer_manager().await;
});
let result = timeout(Duration::from_secs(1), handle).await;
assert!(result.is_ok(), "Lock ordering should prevent deadlocks");
}
#[tokio::test]
async fn test_concurrent_operations_stress() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let mut handles = vec![];
for _ in 0..50 {
let manager_clone = Arc::clone(&manager);
let handle = tokio::spawn(async move {
{
let _pm = manager_clone.peer_manager().await;
tokio::time::sleep(Duration::from_millis(1)).await;
} let _bytes_sent = manager_clone.bytes_sent();
});
handles.push(handle);
}
let results = futures::future::join_all(handles).await;
for result in results {
assert!(result.is_ok(), "Concurrent operations should succeed");
}
}
#[tokio::test]
async fn test_concurrent_rate_limiting() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let mut handles = vec![];
for _ in 0..20 {
let manager_clone = Arc::clone(&manager);
let addr_clone = addr;
let handle = tokio::spawn(async move {
let _ = manager_clone.send_to_peer(addr_clone, vec![0u8; 10]).await;
});
handles.push(handle);
}
let results = futures::future::join_all(handles).await;
for result in results {
assert!(result.is_ok(), "Rate limiting should not cause deadlocks");
}
}
#[tokio::test]
async fn test_lock_timeout() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let manager_clone = Arc::clone(&manager);
let long_task = tokio::spawn(async move {
let _pm = manager_clone.peer_manager().await;
tokio::time::sleep(Duration::from_millis(500)).await;
});
let manager_clone2 = Arc::clone(&manager);
let timeout_task = tokio::spawn(async move {
let result = timeout(Duration::from_millis(100), async {
manager_clone2.peer_manager().await
})
.await;
assert!(result.is_ok(), "Lock should be acquired eventually");
});
let _ = tokio::join!(long_task, timeout_task);
}
#[tokio::test]
async fn test_concurrent_peer_operations() {
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let mut handles = vec![];
for i in 0..10 {
let manager_clone = Arc::clone(&manager);
let handle = tokio::spawn(async move {
let port = 8080 + i;
let _addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let _count = {
let pm = manager_clone.peer_manager().await;
pm.peer_count()
};
});
handles.push(handle);
}
let results = futures::future::join_all(handles).await;
for result in results {
assert!(result.is_ok(), "Concurrent peer operations should succeed");
}
}
#[tokio::test]
async fn test_concurrent_message_processing() {
let (_tx, mut rx): (mpsc::UnboundedSender<()>, _) = mpsc::unbounded_channel();
let manager = Arc::new(NetworkManager::new("127.0.0.1:8333".parse().unwrap()));
let manager_clone = Arc::clone(&manager);
let sender_task = tokio::spawn(async move {
for i in 0..100 {
let port = 8080 + (i % 10);
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let _ = manager_clone.send_to_peer(addr, vec![0u8; 10]).await;
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
let receiver_task = tokio::spawn(async move {
let mut count = 0;
while (timeout(Duration::from_millis(100), rx.recv()).await).is_ok() {
count += 1;
if count >= 100 {
break;
}
}
});
let (sender_result, receiver_result) = tokio::join!(sender_task, receiver_task);
assert!(sender_result.is_ok(), "Sender should complete");
assert!(receiver_result.is_ok(), "Receiver should complete");
}