use hdrhistogram::Histogram;
use reqwest::ClientBuilder;
use std::collections::HashMap;
use std::io::Read;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
pub mod error;
pub use error::{InquisitorError, Result};
pub mod config;
pub use config::{Config, Method};
pub mod time;
use time::Microseconds;
pub const MAX_CONNS: usize = 12;
pub fn run<C: Into<Config>>(config: C) -> Result<()> {
let config: Config = config.into();
let should_exit = Arc::new(AtomicBool::new(false));
let should_exit_clone = should_exit.clone();
ctrlc::set_handler(move || {
let previously_set = should_exit_clone.fetch_or(true, Ordering::SeqCst);
if previously_set {
std::process::exit(130);
}
})?;
let (iterations, duration) = config.iterations_and_duration();
let mut headers = HashMap::new();
for header in config.header {
if let Some((k, v)) = header.split_once(':') {
headers.insert(k.to_string(), v.to_string());
}
}
let times = Arc::new(Mutex::new(Histogram::<u64>::new_with_max(
1_000_000_000_000,
3,
)?));
let passes = Arc::new(AtomicUsize::new(0));
let errors = Arc::new(AtomicUsize::new(0));
let test_start_time = std::time::SystemTime::now();
let failed_regex = match config.failed_body {
Some(regex) => Some(regex::Regex::new(®ex)?),
None => None,
};
let request_body = Box::leak(Box::new(config.request_body)) as &Option<_>;
let mut handles = Vec::new();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()?;
let mut cert = None;
if let Some(cert_file) = config.ca_cert.as_deref() {
let mut buf = Vec::new();
std::fs::File::open(cert_file)
.map_err(|e| InquisitorError::CouldNotOpenFile(e, cert_file.to_owned()))?
.read_to_end(&mut buf)
.map_err(|e| InquisitorError::CouldNotReadFile(e, cert_file.to_owned()))?;
cert = Some(
reqwest::Certificate::from_pem(&buf)
.map_err(|e| InquisitorError::CouldNotConvertCert(e, cert_file.to_owned()))?,
);
}
for _ in 0..config.connections {
let mut client = ClientBuilder::new().danger_accept_invalid_certs(config.insecure);
if let Some(cert) = cert.clone() {
client = client.add_root_certificate(cert);
}
let client = client
.build()
.map_err(InquisitorError::FailedToCreateClient)?;
let passes = passes.clone();
let errors = errors.clone();
let url = config.url.clone();
let headers = headers.clone();
let failed_regex = failed_regex.clone();
let times = times.clone();
let should_exit = should_exit.clone();
let task = rt.spawn(async move {
let mut total = passes.load(Ordering::Relaxed) + errors.load(Ordering::Relaxed);
let mut total_elapsed = test_start_time.elapsed().unwrap().as_micros() as u64;
while total < iterations && total_elapsed < duration {
if should_exit.load(Ordering::Relaxed) {
break;
}
let mut builder = match config.method {
Method::Get => client.get(&url),
Method::Post => client.post(&url),
};
if let Some(body) = request_body.as_deref() {
builder = builder.body(body);
}
for (k, v) in &headers {
builder = builder.header(k, v);
}
let req_start_time = std::time::SystemTime::now();
let response = builder.send().await;
let elapsed = req_start_time.elapsed().unwrap().as_micros() as u64;
times
.lock()
.await
.record(elapsed)
.expect("time out of bounds");
match response {
Ok(res) if res.status().is_success() && failed_regex.is_none() => {
passes.fetch_add(1, Ordering::SeqCst);
if config.print_response {
println!(
"Response successful. Content: {}",
res.text().await.unwrap()
);
}
}
Ok(res) if res.status().is_success() && failed_regex.is_some() => {
let body = res.text().await.unwrap();
if failed_regex.as_ref().unwrap().is_match(&body) {
if !config.hide_errors {
eprintln!("Response is 200 but body indicates an error: {}", body);
}
errors.fetch_add(1, Ordering::SeqCst);
} else {
passes.fetch_add(1, Ordering::SeqCst);
if config.print_response {
println!("Response successful. Contents: {}", body);
}
}
}
Ok(res) => {
if !config.hide_errors {
eprintln!("Response is not 200. Status code: {}", res.status());
}
errors.fetch_add(1, Ordering::SeqCst);
}
Err(e) => {
if !config.hide_errors {
eprintln!("Request failed: {}", e);
}
errors.fetch_add(1, Ordering::SeqCst);
}
};
total = passes.load(Ordering::Relaxed) + errors.load(Ordering::Relaxed);
total_elapsed = test_start_time.elapsed().unwrap().as_micros() as u64;
}
});
handles.push(task);
}
let times = rt.block_on(async {
futures::future::join_all(handles).await;
Arc::try_unwrap(times)
.expect("bug: could not unwrap Arc")
.into_inner()
});
let elapsed_us = test_start_time
.elapsed()
.map_err(InquisitorError::FailedToGetTimeInterval)?
.as_micros() as f64;
print_results(
times,
elapsed_us,
errors.load(Ordering::Relaxed),
passes.load(Ordering::Relaxed),
);
Ok(())
}
fn print_results(times: Histogram<u64>, elapsed_us: f64, errors: usize, passes: usize) {
let iterations = passes + errors;
let rps = (iterations as f64 / (elapsed_us / 1_000_000.0)) as usize;
println!("total time: {}", Microseconds(elapsed_us));
print!("errors: {}/{}", errors, iterations);
if errors > 0 {
println!(" ({:.2}%)", (errors as f64 / iterations as f64) * 100.0);
} else {
println!();
}
println!("throughput: {} req./s", rps,);
println!(
"response times:\n\tmean\t{}\n\tst.dev\t{}\n\tmin\t{}\n\tmax\t{}",
Microseconds(times.mean()),
Microseconds(times.stdev()),
Microseconds(times.min() as f64),
Microseconds(times.max() as f64),
);
println!(
"latencies:\n\t50%\t{}\n\t75%\t{}\n\t90%\t{}\n\t95%\t{}\n\t99%\t{}\n\t99.9%\t{}",
Microseconds(times.value_at_quantile(0.5) as f64),
Microseconds(times.value_at_quantile(0.75) as f64),
Microseconds(times.value_at_quantile(0.9) as f64),
Microseconds(times.value_at_quantile(0.95) as f64),
Microseconds(times.value_at_quantile(0.99) as f64),
Microseconds(times.value_at_quantile(0.999) as f64),
);
}