use {
axum::{routing::get, Router},
hyper::{Body, Request},
log::info,
std::{
future::IntoFuture,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
},
tokio::{net::TcpStream, sync::oneshot::Sender, task::JoinSet, time::timeout},
tower::ServiceExt,
};
const TEST_SECONDS: u64 = 10;
pub async fn axum_main(port: u16, ready: Sender<()>) {
async fn root() -> &'static str {
tokio::time::sleep(Duration::from_millis(1)).await;
"Hello, World!"
}
let app = Router::new().route("/", get(root));
let listener =
tokio::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))
.await
.unwrap();
info!("Server on port {port} ready");
ready.send(()).unwrap();
let timeout = tokio::time::timeout(
Duration::from_secs(TEST_SECONDS + 1),
axum::serve(listener, app).into_future(),
)
.await;
match timeout {
Ok(v) => {
v.unwrap();
}
Err(_) => {
info!("Terminating server on port {port}");
}
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct Stats {
pub latency_s: f32,
pub requests_per_second: f32,
}
pub async fn workload_main(ports: &[u16], tasks: usize) -> anyhow::Result<Stats> {
struct ControlBlock {
start_time: std::time::Instant,
requests: AtomicUsize,
cumulative_latency_us: AtomicUsize,
}
let control_block = Arc::new(ControlBlock {
start_time: std::time::Instant::now(),
requests: AtomicUsize::new(0),
cumulative_latency_us: AtomicUsize::new(0),
});
async fn connection(port: u16, control_block: Arc<ControlBlock>) -> anyhow::Result<()> {
let sa = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
let stream = TcpStream::connect(sa).await?;
let (mut request_sender, connection) = hyper::client::conn::handshake(stream).await?;
tokio::spawn(async move {
let _ = connection.await;
});
let path = "/";
while control_block.start_time.elapsed() < Duration::from_secs(TEST_SECONDS) {
let req = Request::builder()
.uri(path)
.method("GET")
.body(Body::from(""))?;
let start = Instant::now();
let res = timeout(Duration::from_millis(100), request_sender.send_request(req)).await;
let res = match res {
Ok(res) => res?,
Err(_) => {
anyhow::bail!("Timeout on request!")
}
};
let _ = res.body();
if res.status() != 200 {
anyhow::bail!("Got error from server");
}
control_block
.cumulative_latency_us
.fetch_add(start.elapsed().as_micros() as usize, Ordering::Relaxed);
control_block.requests.fetch_add(1, Ordering::Relaxed);
request_sender.ready().await?;
}
Ok(())
}
let mut join_set = JoinSet::new();
for port in ports {
info!("Starting load generation on port {port}");
for _t in 0..tasks {
join_set.spawn(connection(*port, control_block.clone()));
}
}
while let Some(jr) = join_set.join_next().await {
jr??;
}
let requests = control_block.requests.load(Ordering::Relaxed);
let latency_accumulator_us = control_block.cumulative_latency_us.load(Ordering::Relaxed);
Ok(Stats {
requests_per_second: requests as f32 / TEST_SECONDS as f32,
#[allow(clippy::arithmetic_side_effects)]
latency_s: (latency_accumulator_us / requests) as f32 / 1e6,
})
}