use hyper::Method;
use hyper::{client::HttpConnector, Body, Client, Request, Uri};
use super::HttpConfig;
use super::TrafficGenerator;
use crate::scenario::Scenario;
use crossbeam_channel::{bounded, Receiver, Sender};
use std::collections::BTreeMap;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use tokio;
use tokio::sync::Semaphore;
pub struct HyperGenerator<S: Scenario> {
pub scenario: S,
pub request: HttpConfig,
pub max_open_connections: usize,
worker_count: usize,
send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
send_num_packets_channel: (Sender<usize>, Receiver<usize>),
error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
warmup: bool,
}
impl<T: Scenario> TrafficGenerator<T> for HyperGenerator<T> {
fn run_scenario(&mut self) {
let rt = self.create_runtime();
rt.block_on(self.run_reqwests())
}
fn fire_hose(&mut self) {
let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
let mut sent = 0;
let core_ids = core_affinity::get_core_ids().unwrap();
let num_cores = if self.worker_count == 0 {
core_ids.len()
} else {
self.worker_count
};
let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
bounded(num_cores * 2);
for (i, thread_id) in core_ids.into_iter().enumerate() {
let fut = fire_other_thread(
self.max_open_connections / num_cores,
self.scenario.duration(),
num_sent_channel.0.clone(),
rate_channel.0.clone(),
error_rate_channel.0.clone(),
thread_id.id,
self.request.clone(),
);
std::thread::spawn(move || {
core_affinity::set_for_current(thread_id);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(fut)
});
if i >= num_cores {
break;
}
}
let start = Instant::now();
while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
while let Ok(rate) = rate_channel.1.try_recv() {
suc_rates.insert(rate.0, rate.1);
}
while let Ok(rate) = error_rate_channel.1.try_recv() {
error_rates.insert(rate.0, rate.1);
}
while let Ok(s) = num_sent_channel.1.try_recv() {
sent += s;
}
let _ = self.send_num_packets_channel.0.try_send(sent);
let mut tot_suc_rate = 0.;
for (_, r) in suc_rates.iter() {
tot_suc_rate += r;
}
let _ = self
.send_rate_channel
.0
.try_send((start.elapsed().as_millis() as f32, tot_suc_rate));
let mut tot_fail_rate = 0.;
for (_, r) in error_rates.iter() {
tot_fail_rate += r;
}
let _ = self
.error_rate_channel
.0
.try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
std::thread::sleep(Duration::from_millis(50));
}
}
fn set_scenario(&mut self, schem: T) {
self.scenario = schem;
}
fn get_scenario(&self) -> &T {
&self.scenario
}
fn send_packet(&mut self) {
todo!()
}
fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
self.send_rate_channel.1.clone()
}
fn get_sent_packets_channel(&self) -> Receiver<usize> {
self.send_num_packets_channel.1.clone()
}
fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
self.error_rate_channel.1.clone()
}
}
impl<S: Scenario> HyperGenerator<S> {
pub fn new(
scenario: S,
request: HttpConfig,
max_open_connections: usize,
worker_count: usize,
warmup: bool,
) -> Self {
HyperGenerator {
scenario,
request,
max_open_connections,
worker_count,
send_rate_channel: bounded(worker_count * 2),
send_num_packets_channel: bounded(worker_count * 2),
error_rate_channel: bounded(worker_count * 2),
warmup,
}
}
fn create_runtime(&self) -> tokio::runtime::Runtime {
if self.worker_count == 0 {
tokio::runtime::Runtime::new().unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.worker_count)
.enable_all()
.build()
.unwrap()
}
}
async fn run_reqwests(&mut self) {
let max_tokio_spawn = self.max_open_connections;
let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
let test_duration = self.scenario.duration();
let client: Client<HttpConnector, Body> = Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
.http2_only(true)
.build_http();
let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
let num_successful = Arc::new(AtomicUsize::new(0));
let num_sent = Arc::new(AtomicUsize::new(0));
let num_fail = Arc::new(AtomicUsize::new(0));
let mut previos_suc: usize = 0;
let mut previos_fail: usize = 0;
let mut previos_time = Instant::now();
let mut start = Instant::now();
let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
let mut has_warmed_up = false;
let uri = Uri::try_from(self.request.url.clone()).unwrap();
let method = Method::try_from(self.request.method.as_str()).unwrap();
while start.elapsed().as_millis() < test_duration {
for _ in 0..100 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let suc_clone = num_successful.clone();
let tot_clone = num_sent.clone();
let fail_clone = num_fail.clone();
let req = Request::builder()
.uri(uri.clone())
.method(method.clone())
.body(hyper::Body::from(self.request.body.clone()))
.unwrap();
let fut = client.request(req);
tokio::spawn(async move {
let res = fut.await;
tot_clone.fetch_add(1, Ordering::Relaxed);
match res {
Ok(r) => {
if r.status().is_success() {
suc_clone.fetch_add(1, Ordering::Relaxed);
} else {
fail_clone.fetch_add(1, Ordering::Relaxed);
}
}
Err(_e) => {
fail_clone.fetch_add(1, Ordering::Relaxed);
}
}
drop(permit);
});
}
std::thread::sleep(send_delay);
let prev_elasped = previos_time.elapsed();
if prev_elasped.as_millis() > 100 {
send_rate = self.scenario.rate(start.elapsed());
let suc = num_successful.load(Ordering::Relaxed);
let suc_delta: f64 = suc as f64 - previos_suc as f64;
let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());
let adjustment = 2. / (1. + (-(suc_rate as f64 - send_rate as f64) / 10000.).exp());
send_delay =
Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);
let fail = num_fail.load(Ordering::Relaxed);
let fail_delta: f64 = fail as f64 - previos_fail as f64;
let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());
let test_time = start.elapsed().as_millis();
if self.warmup && !has_warmed_up {
if (adjustment - 1.).abs() < 0.01 {
has_warmed_up = true;
start = Instant::now();
}
} else {
let _ = self
.send_rate_channel
.0
.try_send((test_time as f32, suc_rate as f32));
let _ = self
.error_rate_channel
.0
.try_send((test_time as f32, fail_rate as f32));
let _ = self.send_num_packets_channel.0.try_send(suc as usize);
}
previos_suc = suc;
previos_fail = fail;
previos_time = Instant::now();
}
}
let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
}
}
async fn fire_other_thread(
max_tokio_spawn: usize,
test_duration: u128,
num_packets_channel: Sender<usize>,
packet_rate_channel: Sender<(usize, f32)>,
error_rate_channel: Sender<(usize, f32)>,
thread_id: usize,
rconf: HttpConfig,
) {
let client: Client<HttpConnector, Body> = Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
.http2_only(true)
.build_http();
let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
let num_successful = Arc::new(AtomicUsize::new(0));
let num_sent = Arc::new(AtomicUsize::new(0));
let num_fail = Arc::new(AtomicUsize::new(0));
let mut previos_suc: usize = 0;
let mut previos_fail: usize = 0;
let mut previos_time = Instant::now();
let start = Instant::now();
let uri = Uri::try_from(rconf.url).unwrap();
let method = Method::try_from(rconf.method.as_str()).unwrap();
while start.elapsed().as_millis() < test_duration {
for _ in 0..100 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let suc_clone = num_successful.clone();
let tot_clone = num_sent.clone();
let fail_clone = num_fail.clone();
let req = Request::builder()
.uri(uri.clone())
.method(method.clone())
.body(hyper::Body::from(rconf.body.clone()))
.unwrap();
let fut = client.request(req);
tokio::spawn(async move {
let res = fut.await;
tot_clone.fetch_add(1, Ordering::Relaxed);
match res {
Ok(r) => {
if r.status().is_success() {
suc_clone.fetch_add(1, Ordering::Relaxed);
} else {
fail_clone.fetch_add(1, Ordering::Relaxed);
}
}
Err(_e) => {
fail_clone.fetch_add(1, Ordering::Relaxed);
}
}
drop(permit);
});
}
let prev_elasped = previos_time.elapsed();
if prev_elasped.as_millis() > 100 {
let suc = num_successful.load(Ordering::Relaxed);
let suc_delta: f64 = suc as f64 - previos_suc as f64;
let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());
let fail = num_fail.load(Ordering::Relaxed);
let fail_delta: f64 = fail as f64 - previos_fail as f64;
let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());
let _ = packet_rate_channel.try_send((thread_id, suc_rate as f32));
let _ = error_rate_channel.try_send((thread_id, fail_rate as f32));
let _ = num_packets_channel.try_send(suc_delta as usize);
previos_suc = suc;
previos_fail = fail;
previos_time = Instant::now();
}
}
let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
}