use crate::generator::TrafficGenerator;
use crate::scenario::Scenario;
use crossbeam_channel::{bounded, Receiver, Sender};
use reqwest;
use std::time::{Duration, Instant};
use std::{
collections::BTreeMap,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::Semaphore;
use tokio::{self, runtime::Runtime};
pub struct ReqwestGenerator<S: Scenario> {
pub scenario: S,
pub max_open_connections: usize,
worker_count: usize,
pub warmup: bool,
pub req: reqwest::Request,
send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
send_num_packets_channel: (Sender<usize>, Receiver<usize>),
error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
}
impl<T: Scenario> TrafficGenerator<T> for ReqwestGenerator<T> {
fn run_scenario(&mut self) {
let rt = self.create_runtime();
rt.block_on(self.run_reqwests())
}
fn fire_hose(&mut self) {
let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
let mut sent = 0;
let core_ids = core_affinity::get_core_ids().unwrap();
let num_cores = if self.worker_count == 0 {
core_ids.len()
} else {
self.worker_count
};
let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
bounded(num_cores * 2);
for (i, thread_id) in core_ids.into_iter().enumerate() {
let fut = fire_other_thread(
self.max_open_connections / num_cores,
self.scenario.duration(),
self.req.try_clone().unwrap(),
num_sent_channel.0.clone(),
rate_channel.0.clone(),
error_rate_channel.0.clone(),
thread_id.id,
);
std::thread::spawn(move || {
core_affinity::set_for_current(thread_id);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(fut)
});
if i >= num_cores {
break;
}
}
let start = Instant::now();
while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
while let Ok(rate) = rate_channel.1.try_recv() {
suc_rates.insert(rate.0, rate.1);
}
while let Ok(rate) = error_rate_channel.1.try_recv() {
error_rates.insert(rate.0, rate.1);
}
while let Ok(s) = num_sent_channel.1.try_recv() {
sent += s;
}
let _ = self.send_num_packets_channel.0.try_send(sent);
let mut tot_suc_rate = 0.;
for (_, r) in suc_rates.iter() {
tot_suc_rate += r;
}
let _ = self
.send_rate_channel
.0
.try_send((start.elapsed().as_millis() as f32, tot_suc_rate));
let mut tot_fail_rate = 0.;
for (_, r) in error_rates.iter() {
tot_fail_rate += r;
}
let _ = self
.error_rate_channel
.0
.try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
std::thread::sleep(Duration::from_millis(50));
}
}
fn set_scenario(&mut self, schem: T) {
self.scenario = schem;
}
fn get_scenario(&self) -> &T {
&self.scenario
}
fn send_packet(&mut self) {
let client = reqwest::blocking::Client::new();
let _ = client.get(self.req.url().clone()).send();
}
fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
self.send_rate_channel.1.clone()
}
fn get_sent_packets_channel(&self) -> Receiver<usize> {
self.send_num_packets_channel.1.clone()
}
fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
self.error_rate_channel.1.clone()
}
}
impl<S: Scenario> ReqwestGenerator<S> {
pub fn new(
scenario: S,
max_open_connections: usize,
worker_count: usize,
warmup: bool,
req: reqwest::Request,
) -> Self {
ReqwestGenerator {
scenario,
max_open_connections,
req,
warmup,
worker_count,
send_rate_channel: bounded(10),
send_num_packets_channel: bounded(10),
error_rate_channel: bounded(10),
}
}
fn create_runtime(&self) -> Runtime {
if self.worker_count == 0 {
tokio::runtime::Runtime::new().unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.worker_count)
.enable_all()
.build()
.unwrap()
}
}
async fn run_reqwests(&mut self) {
let max_tokio_spawn: usize = self.max_open_connections;
let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
let client = reqwest::Client::new();
let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
let num_successful = Arc::new(AtomicUsize::new(0));
let num_error = Arc::new(AtomicUsize::new(0));
let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
let mut previos_count: usize = 0;
let mut previos_error: usize = 0;
let mut previos_time = Instant::now();
let mut start = Instant::now();
let mut elasped = start.elapsed();
let test_duration = self.scenario.duration();
let mut has_warmed_up = false;
while elasped.as_millis() < test_duration {
for _ in 0..10 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let ns = num_successful.clone();
let ne = num_error.clone();
let fut = client.execute(self.req.try_clone().unwrap());
tokio::spawn(async move {
let res = fut.await;
drop(permit);
match res {
Ok(r) => {
if r.status().is_success() {
ns.fetch_add(1, Ordering::Relaxed);
} else {
ne.fetch_add(1, Ordering::Relaxed);
}
}
_ => {
ne.fetch_add(1, Ordering::Relaxed);
}
}
});
}
std::thread::sleep(send_delay);
let prev_elasped = previos_time.elapsed();
if prev_elasped.as_millis() > 100 {
elasped = start.elapsed();
let suc = num_successful.load(Ordering::Relaxed);
let error = num_error.load(Ordering::Relaxed);
let delta: f64 = suc as f64 - previos_count as f64;
let suc_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
let delta: f64 = error as f64 - previos_error as f64;
let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
send_rate = self.scenario.rate(elasped);
previos_time = Instant::now();
previos_count = suc;
previos_error = error;
let rate_delta = suc_rate as f64 - send_rate as f64;
let adjustment = 2. / (1. + (-(rate_delta as f64) / 10000.).exp());
send_delay =
Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);
if self.warmup && !has_warmed_up {
if (adjustment - 1.).abs() < 0.01 {
has_warmed_up = true;
start = Instant::now();
}
} else {
let _ = self
.send_rate_channel
.0
.try_send((elasped.as_millis() as f32, suc_rate as f32));
let _ = self.send_num_packets_channel.0.try_send(suc);
let _ = self
.error_rate_channel
.0
.try_send((elasped.as_millis() as f32, error_rate as f32));
}
}
}
let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
}
}
async fn fire_other_thread(
max_tokio_spawn: usize,
test_duration: u128,
request: reqwest::Request,
num_packets_channel: Sender<usize>,
packet_rate_channel: Sender<(usize, f32)>,
error_rate_channel: Sender<(usize, f32)>,
thread_id: usize,
) {
let client = reqwest::Client::new();
let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
let num_successful = Arc::new(AtomicUsize::new(0));
let num_error = Arc::new(AtomicUsize::new(0));
let mut previos_count: usize = 0;
let mut previos_error: usize = 0;
let mut previos_time = Instant::now();
let start = Instant::now();
while start.elapsed().as_millis() < test_duration {
for _ in 0..10 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let ns = num_successful.clone();
let ne = num_error.clone();
let fut = client.execute(request.try_clone().unwrap());
tokio::spawn(async move {
let res = fut.await;
drop(permit);
match res {
Ok(r) => {
if r.status().is_success() {
ns.fetch_add(1, Ordering::Relaxed);
} else {
ne.fetch_add(1, Ordering::Relaxed);
}
}
_ => {
ne.fetch_add(1, Ordering::Relaxed);
}
}
});
}
let prev_elasped = previos_time.elapsed();
if prev_elasped.as_millis() > 100 {
let suc = num_successful.fetch_add(0, Ordering::Relaxed);
let error = num_error.load(Ordering::Relaxed);
let suc_delta: f64 = suc as f64 - previos_count as f64;
let rate = suc_delta as u128 * 1000000 / (previos_time.elapsed().as_micros());
let delta: f64 = error as f64 - previos_error as f64;
let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());
previos_time = Instant::now();
previos_count = suc;
previos_error = error;
let _ = packet_rate_channel.try_send((thread_id, rate as f32));
let _ = num_packets_channel.try_send(suc_delta as usize);
let _ = error_rate_channel.try_send((thread_id, error_rate as f32));
}
}
let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
}