use crate::stats::{RealtimeStats, Statistics};
use crate::{Options, fatal};
use http::{HeaderMap, StatusCode, header};
use http::header::HeaderValue;
use crate::client::utils::{build_headers, should_stop};
use reqwest::{Client, ClientBuilder, Request, Result, Url};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
pub async fn http_reqwest(
tid: usize,
cid: usize,
opts: Arc<Options>,
rt_stats: &RealtimeStats,
) -> Statistics {
let mut statistics = Statistics::new(opts.latency);
let mut total: u32 = 0;
let mut banner = HashSet::new();
let uri_str = opts.uri[cid % opts.uri.len()].as_str();
let uri = uri_str
.parse::<hyper::Uri>()
.unwrap_or_else(|e| fatal!(1, "invalid uri: {e}"));
let url = Url::parse(uri_str).unwrap_or_else(|e| fatal!(1, "invalid url: {e}"));
let headers = build_headers(&uri, opts.as_ref())
.unwrap_or_else(|e| fatal!(2, "could not build headers: {e}"));
let bodies: Vec<String> = opts.bodies().map_or_else(
|e| fatal!(2, "could not read body: {e}"),
|v| {
v.into_iter()
.map(|b| String::from_utf8_lossy(&b).to_string())
.collect()
},
);
let clock = quanta::Clock::new();
let start = Instant::now();
'connection: loop {
if should_stop(total, start, &opts) {
break 'connection;
}
if cid < opts.uri.len() && !banner.contains(uri_str) {
banner.insert(uri_str.to_owned());
println!(
"reqwest [{tid:>2}] -> connecting. {} {} {}...",
opts.method.as_ref().unwrap_or(&http::Method::GET),
url,
if opts.http2 { "HTTP/2" } else { "HTTP/1.1" }
);
}
let mut client = match build_http_client(opts.as_ref(), &headers) {
Ok(client) => client,
Err(e) => {
fatal!(4, "could not build reqwest http client: {e}");
}
};
statistics.inc_conn();
let mut conn_req_count: u32 = 0;
loop {
conn_req_count += 1;
let is_last = conn_req_count >= opts.rpc;
let body = bodies.get(total as usize).or(bodies.last());
let mut req = Request::new(
opts.method.clone().unwrap_or(http::Method::GET),
url.clone(),
);
let mut req_headers = headers.clone();
if is_last {
req_headers.insert(header::CONNECTION, HeaderValue::from_static("close"));
}
*req.headers_mut() = req_headers;
if let Some(body) = body {
*req.body_mut() = Some(body.clone().into());
}
let start_lat = opts.latency.then_some(clock.raw());
match client.execute(req).await {
Ok(res) => {
let code = res.status();
if matches!(code, StatusCode::OK) {
statistics.inc_ok(rt_stats);
} else {
statistics.set_http_status(code, rt_stats);
}
}
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
}
if let Some(start_lat) = start_lat
&& let Some(hist) = &mut statistics.latency
{
hist.record(clock.delta_as_nanos(start_lat, clock.raw()) / 1000)
.ok();
};
total += 1;
if should_stop(total, start, &opts) {
break 'connection;
}
if is_last {
client = match build_http_client(opts.as_ref(), &headers) {
Ok(client) => client,
Err(e) => {
fatal!(4, "could not build reqwest http client: {e}");
}
};
conn_req_count = 0;
}
}
}
statistics
}
pub fn build_http_client(opts: &Options, headers: &HeaderMap) -> Result<Client> {
let mut builder = ClientBuilder::new().default_headers(headers.clone());
if opts.http2 {
builder = builder.http2_adaptive_window(opts.http2_adaptive_window.unwrap_or(false));
builder = builder.http2_initial_stream_window_size(opts.http2_initial_stream_window_size);
builder =
builder.http2_initial_connection_window_size(opts.http2_initial_connection_window_size);
builder = builder.http2_max_frame_size(opts.http2_max_frame_size);
if let Some(v) = opts.http2_max_header_list_size {
builder = builder.http2_max_header_list_size(v);
}
builder = builder.http2_keep_alive_while_idle(opts.http2_keep_alive_while_idle);
} else {
if opts.http1_title_case_headers {
builder = builder.http1_title_case_headers();
}
if opts.http1_allow_obsolete_multiline_headers_in_responses {
builder = builder.http1_allow_obsolete_multiline_headers_in_responses(true);
}
if opts.http1_ignore_invalid_headers_in_responses {
builder = builder.http1_ignore_invalid_headers_in_responses(true);
}
if opts.http1_allow_spaces_after_header_name_in_responses {
builder = builder.http1_allow_spaces_after_header_name_in_responses(true);
}
if opts.http09_responses {
builder = builder.http09_responses();
}
}
builder.build()
}