use qudag_protocol::{Coordinator, ProtocolConfig, ProtocolState};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
#[tokio::test]
async fn test_concurrent_message_broadcasting() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
{
let mut coord = coordinator.lock().await;
coord.start().await.unwrap();
}
let mut handles = Vec::new();
for i in 0..10 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
let message = vec![i as u8; 100];
let mut coord = coordinator_clone.lock().await;
coord.broadcast_message(message).await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok(), "Concurrent message broadcasting failed");
}
{
let mut coord = coordinator.lock().await;
coord.stop().await.unwrap();
}
}
#[tokio::test]
async fn test_concurrent_state_access() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
{
let mut coord = coordinator.lock().await;
coord.start().await.unwrap();
}
let mut handles = Vec::new();
for _ in 0..10 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
let coord = coordinator_clone.lock().await;
coord.state().await
});
handles.push(handle);
}
for handle in handles {
let state = handle.await.unwrap();
assert_eq!(state, ProtocolState::Running);
}
{
let mut coord = coordinator.lock().await;
coord.stop().await.unwrap();
}
}
#[tokio::test]
async fn test_concurrent_start_stop_operations() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
let mut start_handles = Vec::new();
for _ in 0..5 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
let mut coord = coordinator_clone.lock().await;
coord.start().await
});
start_handles.push(handle);
}
let mut success_count = 0;
for handle in start_handles {
if handle.await.unwrap().is_ok() {
success_count += 1;
}
}
assert!(success_count >= 1, "At least one start operation should succeed");
{
let coord = coordinator.lock().await;
assert_eq!(coord.state().await, ProtocolState::Running);
}
let mut stop_handles = Vec::new();
for _ in 0..5 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
let mut coord = coordinator_clone.lock().await;
coord.stop().await
});
stop_handles.push(handle);
}
let mut stop_success_count = 0;
for handle in stop_handles {
if handle.await.unwrap().is_ok() {
stop_success_count += 1;
}
}
assert!(stop_success_count >= 1, "At least one stop operation should succeed");
}
#[tokio::test]
async fn test_concurrent_component_access() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
{
let mut coord = coordinator.lock().await;
coord.start().await.unwrap();
}
let mut handles = Vec::new();
for _ in 0..10 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
let coord = coordinator_clone.lock().await;
let _crypto = coord.crypto_manager();
let _network = coord.network_manager();
let _dag = coord.dag_manager();
true
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
assert!(result, "Component access should succeed");
}
{
let mut coord = coordinator.lock().await;
coord.stop().await.unwrap();
}
}
#[tokio::test]
async fn test_high_concurrency_message_processing() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
{
let mut coord = coordinator.lock().await;
coord.start().await.unwrap();
}
let mut handles = Vec::new();
let num_tasks = 50;
let messages_per_task = 10;
for task_id in 0..num_tasks {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
let mut results = Vec::new();
for msg_id in 0..messages_per_task {
let message = vec![
task_id as u8,
msg_id as u8,
(task_id + msg_id) as u8
];
let mut coord = coordinator_clone.lock().await;
let result = coord.broadcast_message(message).await;
results.push(result.is_ok());
drop(coord);
tokio::task::yield_now().await;
}
results
});
handles.push(handle);
}
let mut total_messages = 0;
let mut successful_messages = 0;
for handle in handles {
let results = handle.await.unwrap();
total_messages += results.len();
successful_messages += results.iter().filter(|&&success| success).count();
}
println!("Processed {}/{} messages successfully", successful_messages, total_messages);
assert!(successful_messages > 0, "Some messages should be processed successfully");
{
let mut coord = coordinator.lock().await;
coord.stop().await.unwrap();
}
}
#[tokio::test]
async fn test_concurrent_lifecycle_operations() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
let start_coordinator = coordinator.clone();
let start_handle = tokio::spawn(async move {
let mut coord = start_coordinator.lock().await;
coord.start().await
});
let op_coordinator = coordinator.clone();
let op_handle = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut results = Vec::new();
for i in 0..5 {
let message = vec![i as u8; 10];
let mut coord = op_coordinator.lock().await;
let result = coord.broadcast_message(message).await;
results.push(result.is_ok());
drop(coord);
sleep(Duration::from_millis(1)).await;
}
results
});
let stop_coordinator = coordinator.clone();
let stop_handle = tokio::spawn(async move {
sleep(Duration::from_millis(50)).await; let mut coord = stop_coordinator.lock().await;
coord.stop().await
});
let start_result = start_handle.await.unwrap();
let op_results = op_handle.await.unwrap();
let stop_result = stop_handle.await.unwrap();
assert!(start_result.is_ok(), "Start should succeed");
assert!(stop_result.is_ok(), "Stop should succeed");
let successful_ops = op_results.iter().filter(|&&success| success).count();
println!("Successful operations during lifecycle: {}/{}", successful_ops, op_results.len());
}
#[tokio::test]
async fn test_thread_safety_invariants() {
let config = ProtocolConfig::default();
let coordinator = Arc::new(Mutex::new(Coordinator::new(config).await.unwrap()));
{
let mut coord = coordinator.lock().await;
coord.start().await.unwrap();
}
let mut handles = Vec::new();
for _ in 0..10 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
for _ in 0..10 {
let coord = coordinator_clone.lock().await;
let _state = coord.state().await;
drop(coord);
tokio::task::yield_now().await;
}
});
handles.push(handle);
}
for i in 0..10 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
for j in 0..5 {
let message = vec![i as u8, j as u8];
let mut coord = coordinator_clone.lock().await;
let _result = coord.broadcast_message(message).await;
drop(coord);
tokio::task::yield_now().await;
}
});
handles.push(handle);
}
for _ in 0..5 {
let coordinator_clone = coordinator.clone();
let handle = tokio::spawn(async move {
for _ in 0..10 {
let coord = coordinator_clone.lock().await;
let _crypto = coord.crypto_manager();
let _network = coord.network_manager();
let _dag = coord.dag_manager();
drop(coord);
tokio::task::yield_now().await;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
{
let coord = coordinator.lock().await;
assert_eq!(coord.state().await, ProtocolState::Running);
}
{
let mut coord = coordinator.lock().await;
coord.stop().await.unwrap();
}
}