use crate::fatal;
use crate::stats::Statistics;
use crate::Options;
use crate::stats::RealtimeStats;
use bytes::Bytes;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use http::Request;
use http::StatusCode;
use tokio::net::TcpStream;
use crate::client::utils::*;
use h2;
pub async fn http_hyper_h2(
tid: usize,
cid: usize,
opts: Arc<Options>,
rt_stats: &RealtimeStats,
) -> Statistics {
let mut statistics = Statistics::new(opts.latency);
let mut total: u32 = 0;
let mut banner = HashSet::new();
let uri_str = opts.uri[cid % opts.uri.len()].as_str();
let uri = uri_str
.parse::<hyper::Uri>()
.unwrap_or_else(|e| fatal!(1, "invalid uri: {e}"));
let (host, port) =
get_conn_address(&opts, &uri).unwrap_or_else(|| fatal!(1, "no host specified in uri"));
let endpoint = build_conn_endpoint(&host, port);
let bodies: Vec<Bytes> = opts
.bodies()
.unwrap_or_else(|e| fatal!(2, "could not read body: {e}"));
let headers = build_headers(&uri, opts.as_ref())
.unwrap_or_else(|e| fatal!(2, "could not build headers: {e}"));
let trailers = build_trailers(opts.as_ref())
.unwrap_or_else(|e| fatal!(2, "could not build trailers: {e}"));
let clock = quanta::Clock::new();
let start = Instant::now();
'connection: loop {
if should_stop(total, start, &opts) {
break 'connection;
}
if cid < opts.uri.len() && !banner.contains(uri_str) {
banner.insert(uri_str.to_owned());
println!(
"hyper-h2 [{tid:>2}] -> connecting to {}:{}, method = {} uri = {} HTTP2...",
host,
port,
opts.method.as_ref().unwrap_or(&http::Method::GET),
uri
);
}
let stream_res = TcpStream::connect(endpoint)
.await
.and_then(|s| s.set_nodelay(true).map(|_| s));
let mut stream = match stream_res {
Ok(s) => s,
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
let mut h2_builder = h2::client::Builder::new();
if let Some(v) = opts.http2_initial_max_send_streams {
h2_builder.initial_max_send_streams(v);
}
if let Some(v) = opts.http2_max_concurrent_reset_streams {
h2_builder.max_concurrent_reset_streams(v);
}
if let Some(v) = opts.http2_initial_stream_window_size {
h2_builder.initial_window_size(v);
}
if let Some(v) = opts.http2_initial_connection_window_size {
h2_builder.initial_connection_window_size(v);
}
if let Some(v) = opts.http2_max_frame_size {
h2_builder.max_frame_size(v);
}
if let Some(v) = opts.http2_max_header_list_size {
h2_builder.max_header_list_size(v);
}
if let Some(v) = opts.http2_max_send_buffer_size {
h2_builder.max_send_buffer_size(v);
}
let conn = h2_builder.handshake::<_, bytes::Bytes>(stream).await;
let (mut h2_client, mut connection) = match conn {
Ok(h2_conn) => h2_conn,
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
tokio::task::spawn(async move {
if let Err(err) = connection.await {
eprintln!("error in connection: {}", err)
}
});
statistics.inc_conn();
let mut conn_req_count: u32 = 0;
loop {
conn_req_count += 1;
let is_last = conn_req_count >= opts.rpc;
let body = bodies.get(total as usize).or(bodies.last());
let mut req = Request::new(());
*req.method_mut() = opts.method.clone().unwrap_or(http::Method::GET);
*req.uri_mut() = uri.clone();
*req.headers_mut() = headers.clone();
h2_client = match h2_client.ready().await {
Ok(h2) => h2,
Err(ref err) => {
statistics.set_error(err, rt_stats);
continue 'connection;
}
};
let end_of_stream = body.is_none() && trailers.is_none();
let start_lat = opts.latency.then_some(clock.raw());
let (response, mut send_stream) = match h2_client.send_request(req, end_of_stream) {
Ok(r) => r,
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
if let Some(body) = body {
let end_of_stream = trailers.is_none();
send_stream.send_data(body.clone(), end_of_stream).unwrap();
}
if let Some(ref tr) = trailers {
send_stream.send_trailers(tr.clone()).unwrap();
}
let res = match response.await {
Ok(res) => res,
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
match res.status() {
StatusCode::OK => {
statistics.inc_ok(rt_stats);
let (_head, mut body) = res.into_parts();
while let Some(chunk_res) = body.data().await {
let chunk_len = match chunk_res {
Ok(ref c) => c.len(),
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
let _ = body.flow_control().release_capacity(chunk_len);
}
}
code => statistics.set_http_status(code, rt_stats),
}
if let Some(start_lat) = start_lat
&& let Some(hist) = &mut statistics.latency
{
hist.record(clock.delta_as_nanos(start_lat, clock.raw()) / 1000)
.ok();
};
total += 1;
if should_stop(total, start, &opts) {
break 'connection;
}
if is_last {
let stream_res = TcpStream::connect(endpoint)
.await
.and_then(|s| s.set_nodelay(true).map(|_| s));
stream = match stream_res {
Ok(s) => s,
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
let mut h2_builder = h2::client::Builder::new();
if let Some(v) = opts.http2_initial_max_send_streams {
h2_builder.initial_max_send_streams(v);
}
if let Some(v) = opts.http2_max_concurrent_reset_streams {
h2_builder.max_concurrent_reset_streams(v);
}
if let Some(v) = opts.http2_initial_stream_window_size {
h2_builder.initial_window_size(v);
}
if let Some(v) = opts.http2_initial_connection_window_size {
h2_builder.initial_connection_window_size(v);
}
if let Some(v) = opts.http2_max_frame_size {
h2_builder.max_frame_size(v);
}
if let Some(v) = opts.http2_max_header_list_size {
h2_builder.max_header_list_size(v);
}
if let Some(v) = opts.http2_max_send_buffer_size {
h2_builder.max_send_buffer_size(v);
}
let conn = h2_builder.handshake::<_, bytes::Bytes>(stream).await;
(h2_client, connection) = match conn {
Ok(h2_conn) => h2_conn,
Err(ref err) => {
statistics.set_error(err, rt_stats);
total += 1;
continue 'connection;
}
};
tokio::task::spawn(async move {
if let Err(err) = connection.await {
eprintln!("error in connection: {}", err)
}
});
conn_req_count = 0;
}
}
}
statistics
}