use crate::endpoint::Endpoint;
use crate::health::health_client::HealthClient;
use crate::health::HealthCheckRequest;
use crate::report::EndpointResult;
use futures::future::join_all;
use std::time::Instant;
use tonic::metadata::{Ascii, MetadataKey, MetadataValue};
use tonic::transport::{Channel, ClientTlsConfig};
use tonic::Request;
pub struct BenchConfig {
pub requests: u32,
pub concurrency: u32,
pub warmup:u32,
}
impl Default for BenchConfig {
fn default() -> Self {
Self {
requests: 100,
concurrency: 10,
warmup:10,
}
}
}
pub async fn run(endpoints: Vec<Endpoint>, config: BenchConfig) -> Vec<EndpointResult> {
let mut results = vec![];
for endpoint in &endpoints {
results.push(bench_endpoint(endpoint, &config).await);
}
results
}
async fn bench_endpoint(endpoint: &Endpoint, config: &BenchConfig) -> EndpointResult {
let mut latencies = vec![];
let mut errors = 0u32;
let mut channel_builder = Channel::from_shared(endpoint.url.clone())
.expect("invalid url");
if endpoint.url.starts_with("https://") {
channel_builder = channel_builder
.tls_config(ClientTlsConfig::new().with_native_roots())
.expect("tls config failed");
}
let channel = channel_builder.connect().await;
match channel {
Err(e) => {
eprintln!("[{}] connection failed: {}", endpoint.name, e);
errors = config.requests;
}
Ok(ch) => {
let chunks = config.requests / config.concurrency;
let remainder = config.requests % config.concurrency;
for chunk in 0..chunks {
let batch_size = if chunk == chunks - 1 {
config.concurrency + remainder
} else {
config.concurrency
};
let futures: Vec<_> = (0..batch_size)
.map(|_| send_request(ch.clone(), endpoint))
.collect();
let batch_results = join_all(futures).await;
for result in batch_results {
match result {
Ok(d) => latencies.push(d),
Err(_) => errors += 1,
}
}
}
}
}
EndpointResult {
name: endpoint.name.clone(),
latencies,
errors,
total_requests: config.requests,
}
}
async fn send_request(
ch: Channel,
endpoint: &Endpoint,
) -> Result<std::time::Duration, tonic::Status> {
let mut client = HealthClient::new(ch);
let mut req = Request::new(HealthCheckRequest {
service: String::new(),
});
for (key, value) in &endpoint.headers {
req.metadata_mut().insert(
MetadataKey::<Ascii>::from_bytes(key.as_bytes()).expect("invalid header name"),
MetadataValue::<Ascii>::try_from(value.as_str()).expect("invalid header value"),
);
}
let start = Instant::now();
client.check(req).await?;
Ok(start.elapsed())
}