use network_protocol::protocol::dispatcher::Dispatcher;
use network_protocol::protocol::message::Message;
use network_protocol::service::client::Client;
use network_protocol::utils::logging::{init_logging, LogConfig};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tracing::{info, warn};
use serial_test::serial;
const NUM_CLIENTS: usize = 4; const MESSAGES_PER_CLIENT: usize = 20; const SERVER_PORT: u16 = 9936;
#[serial]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_server_backpressure_under_load() {
init_logging(&LogConfig::default());
info!("Starting backpressure test");
let daemon_handle = tokio::spawn(async move {
let addr = format!("127.0.0.1:{SERVER_PORT}");
let _dispatcher = Arc::new(Dispatcher::default());
info!("Starting daemon on {}", addr);
if let Err(e) = network_protocol::service::daemon::start(&addr).await {
warn!("Daemon error: {}", e);
}
});
tokio::time::sleep(Duration::from_millis(500)).await;
let barrier = Arc::new(Barrier::new(NUM_CLIENTS));
let mut client_handles = Vec::new();
for client_id in 0..NUM_CLIENTS {
let barrier_clone = barrier.clone();
let client_handle = tokio::spawn(async move {
let addr = format!("127.0.0.1:{SERVER_PORT}");
let mut client =
match tokio::time::timeout(Duration::from_secs(5), Client::connect(&addr)).await {
Ok(result) => match result {
Ok(client) => client,
Err(e) => {
warn!("Client {} failed to connect: {}", client_id, e);
return;
}
},
Err(_) => {
warn!("Client {} connection timed out", client_id);
return;
}
};
barrier_clone.wait().await;
info!("Client {} starting message flood", client_id);
for i in 0..MESSAGES_PER_CLIENT {
let message = Message::Ping;
match tokio::time::timeout(Duration::from_secs(5), client.send(message)).await {
Ok(result) => match result {
Ok(_) => {
if i % 10 == 0 {
info!(
"Client {} sent message {}/{}",
client_id, i, MESSAGES_PER_CLIENT
);
}
}
Err(e) => {
warn!("Client {} failed to send message {}: {}", client_id, i, e);
}
},
Err(_) => {
warn!(
"Client {} send operation timed out for message {}",
client_id, i
);
}
}
}
info!("Client {} completed sending all messages", client_id);
});
client_handles.push(client_handle);
}
for (i, handle) in client_handles.into_iter().enumerate() {
match tokio::time::timeout(Duration::from_secs(10), handle).await {
Ok(result) => match result {
Ok(_) => info!("Client {} finished", i),
Err(e) => warn!("Client {} task failed: {:?}", i, e),
},
Err(_) => warn!("Client {} timed out after 10 seconds", i),
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
daemon_handle.abort();
info!("Backpressure test completed successfully");
}
#[serial]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_slow_client_isolation() {
init_logging(&LogConfig::default());
info!("Starting slow client isolation test");
let slow_test_port = SERVER_PORT + 1;
let daemon_handle = tokio::spawn(async move {
let addr = format!("127.0.0.1:{slow_test_port}");
let _dispatcher = Arc::new(Dispatcher::default());
info!("Starting daemon on {}", addr);
if let Err(e) = network_protocol::service::daemon::start(&addr).await {
warn!("Daemon error: {}", e);
}
});
tokio::time::sleep(Duration::from_millis(500)).await;
let slow_client_handle = tokio::spawn(async move {
let addr = format!("127.0.0.1:{slow_test_port}");
let mut client =
match tokio::time::timeout(Duration::from_secs(5), Client::connect(&addr)).await {
Ok(result) => match result {
Ok(client) => client,
Err(e) => {
warn!("Slow client failed to connect: {}", e);
return;
}
},
Err(_) => {
warn!("Slow client connection timed out");
return;
}
};
info!("Slow client connected, sending messages slowly");
for i in 0..10 {
let message = Message::Ping;
match tokio::time::timeout(Duration::from_secs(5), client.send(message)).await {
Ok(result) => match result {
Ok(_) => {
info!("Slow client sent message {}/10", i);
}
Err(e) => {
warn!("Slow client failed to send message {}: {}", i, e);
}
},
Err(_) => {
warn!("Slow client send operation timed out for message {}", i);
continue;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("Slow client completed sending all messages");
});
tokio::time::sleep(Duration::from_millis(300)).await;
let mut fast_client_handles = Vec::new();
for client_id in 0..3 {
let client_handle = tokio::spawn(async move {
let addr = format!("127.0.0.1:{slow_test_port}");
let mut client =
match tokio::time::timeout(Duration::from_secs(5), Client::connect(&addr)).await {
Ok(result) => match result {
Ok(client) => client,
Err(e) => {
warn!("Fast client {} failed to connect: {}", client_id, e);
return;
}
},
Err(_) => {
warn!("Fast client {} connection timed out", client_id);
return;
}
};
info!(
"Fast client {} connected, sending messages quickly",
client_id
);
let start = std::time::Instant::now();
for i in 0..15 {
let message = Message::Ping;
match tokio::time::timeout(Duration::from_secs(5), client.send(message)).await {
Ok(result) => match result {
Ok(_) => {
if i % 5 == 0 {
info!("Fast client {} sent message {}/15", client_id, i);
}
}
Err(e) => {
warn!(
"Fast client {} failed to send message {}: {}",
client_id, i, e
);
}
},
Err(_) => {
warn!(
"Fast client {} send operation timed out for message {}",
client_id, i
);
}
}
}
let elapsed = start.elapsed();
info!("Fast client {} completed in {:?}", client_id, elapsed);
});
fast_client_handles.push(client_handle);
}
for (i, handle) in fast_client_handles.into_iter().enumerate() {
match tokio::time::timeout(Duration::from_secs(10), handle).await {
Ok(result) => match result {
Ok(_) => info!("Fast client {} finished", i),
Err(e) => warn!("Fast client {} task failed: {:?}", i, e),
},
Err(_) => warn!("Fast client {} timed out after 10 seconds", i),
}
}
match tokio::time::timeout(Duration::from_secs(20), slow_client_handle).await {
Ok(result) => match result {
Ok(_) => info!("Slow client finished"),
Err(e) => warn!("Slow client task failed: {:?}", e),
},
Err(_) => warn!("Slow client timed out after 20 seconds"),
}
daemon_handle.abort();
info!("Slow client isolation test completed successfully");
}