pdc-core 0.1.2

A network load testing library
Documentation
use hyper::Method;
use hyper::{client::HttpConnector, Body, Client, Request, Uri};

use super::HttpConfig;
use super::TrafficGenerator;
use crate::scenario::Scenario;

use crossbeam_channel::{bounded, Receiver, Sender};
use std::collections::BTreeMap;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};
use std::time::{Duration, Instant};
use tokio;
use tokio::sync::Semaphore;
/// A hyper based generator. This generator use's HTTP multiplexing, and requires that the target server supports
/// HTTP2. Much like the reqwest generator, `HyperGenerator` is very sensitive to
/// latency. For good proformance, it is recommended to test servers running on the same LAN as your
/// machine. This Generator will reuse TCP connections for requests, and all request will be sent from the same IP.
///
/// ![Graph of RequestGenerator](https://i.imgur.com/LKeZhWM.gif)
pub struct HyperGenerator<S: Scenario> {
    pub scenario: S,
    pub request: HttpConfig,
    /// This is the maximum amount of TCP file descriptors **being used** at any one time.
    /// The actual number of open connections will be higher, because it takes
    /// time to purge old fd's.
    pub max_open_connections: usize,
    /// This controls the amount of worker threads that the backing tokio runtime
    /// uses. Setting this value to zero will create a default multithreaded runtime
    /// equivalent to tokio::runtime::Runtime::new()
    worker_count: usize,
    /// Sends time series data on the send rate of the generator.
    send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
    /// Sends the total number of successfull requests sent.
    send_num_packets_channel: (Sender<usize>, Receiver<usize>),
    /// Sends the error rate.
    error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
    /// True is generator should warm up before sending data
    warmup: bool,
}

impl<T: Scenario> TrafficGenerator<T> for HyperGenerator<T> {
    fn run_scenario(&mut self) {
        let rt = self.create_runtime();
        rt.block_on(self.run_reqwests())
    }

    fn fire_hose(&mut self) {
        let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
        let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
        let mut sent = 0;
        let core_ids = core_affinity::get_core_ids().unwrap();

        let num_cores = if self.worker_count == 0 {
            core_ids.len()
        } else {
            self.worker_count
        };

        let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
        // ThreadID, rate
        let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
        let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
            bounded(num_cores * 2);
        for (i, thread_id) in core_ids.into_iter().enumerate() {
            let fut = fire_other_thread(
                self.max_open_connections / num_cores,
                self.scenario.duration(),
                num_sent_channel.0.clone(),
                rate_channel.0.clone(),
                error_rate_channel.0.clone(),
                thread_id.id,
                self.request.clone(),
            );
            std::thread::spawn(move || {
                core_affinity::set_for_current(thread_id);
                let rt = tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .unwrap();
                rt.block_on(fut)
            });
            if i >= num_cores {
                break;
            }
        }

        let start = Instant::now();
        while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
            while let Ok(rate) = rate_channel.1.try_recv() {
                suc_rates.insert(rate.0, rate.1);
            }
            while let Ok(rate) = error_rate_channel.1.try_recv() {
                error_rates.insert(rate.0, rate.1);
            }
            while let Ok(s) = num_sent_channel.1.try_recv() {
                sent += s;
            }

            let _ = self.send_num_packets_channel.0.try_send(sent);
            let mut tot_suc_rate = 0.;
            for (_, r) in suc_rates.iter() {
                tot_suc_rate += r;
            }
            let _ = self
                .send_rate_channel
                .0
                .try_send((start.elapsed().as_millis() as f32, tot_suc_rate));

            let mut tot_fail_rate = 0.;
            for (_, r) in error_rates.iter() {
                tot_fail_rate += r;
            }
            let _ = self
                .error_rate_channel
                .0
                .try_send((start.elapsed().as_millis() as f32, tot_fail_rate));
            // This is needed to prevent resource contention over the channels
            std::thread::sleep(Duration::from_millis(50));
        }
    }

    fn set_scenario(&mut self, schem: T) {
        self.scenario = schem;
    }

    fn get_scenario(&self) -> &T {
        &self.scenario
    }

    fn send_packet(&mut self) {
        todo!()
    }
    fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
        self.send_rate_channel.1.clone()
    }

    fn get_sent_packets_channel(&self) -> Receiver<usize> {
        self.send_num_packets_channel.1.clone()
    }

    fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
        self.error_rate_channel.1.clone()
    }
}

impl<S: Scenario> HyperGenerator<S> {
    /// Constructs a new [`HyperGenerator`]
    pub fn new(
        scenario: S,
        request: HttpConfig,
        max_open_connections: usize,
        worker_count: usize,
        warmup: bool,
    ) -> Self {
        HyperGenerator {
            scenario,
            request,
            max_open_connections,
            worker_count,
            send_rate_channel: bounded(worker_count * 2),
            send_num_packets_channel: bounded(worker_count * 2),
            error_rate_channel: bounded(worker_count * 2),
            warmup,
        }
    }

    fn create_runtime(&self) -> tokio::runtime::Runtime {
        if self.worker_count == 0 {
            tokio::runtime::Runtime::new().unwrap()
        } else {
            tokio::runtime::Builder::new_multi_thread()
                .worker_threads(self.worker_count)
                .enable_all()
                .build()
                .unwrap()
        }
    }

    //TODO: Give this the same makeover as fire method
    async fn run_reqwests(&mut self) {
        let max_tokio_spawn = self.max_open_connections;
        let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
        let test_duration = self.scenario.duration();
        let client: Client<HttpConnector, Body> = Client::builder()
            .pool_idle_timeout(Duration::from_secs(30))
            .http2_only(true)
            .build_http();

        let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
        let num_successful = Arc::new(AtomicUsize::new(0));
        let num_sent = Arc::new(AtomicUsize::new(0));
        let num_fail = Arc::new(AtomicUsize::new(0));

        let mut previos_suc: usize = 0;
        let mut previos_fail: usize = 0;

        let mut previos_time = Instant::now();
        let mut start = Instant::now();
        let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
        let mut has_warmed_up = false;

        let uri = Uri::try_from(self.request.url.clone()).unwrap();
        let method = Method::try_from(self.request.method.as_str()).unwrap();

        while start.elapsed().as_millis() < test_duration {
            for _ in 0..100 {
                let permit = semaphore.clone().acquire_owned().await.unwrap();
                let suc_clone = num_successful.clone();
                let tot_clone = num_sent.clone();
                let fail_clone = num_fail.clone();

                let req = Request::builder()
                    .uri(uri.clone())
                    .method(method.clone())
                    .body(hyper::Body::from(self.request.body.clone()))
                    .unwrap();
                let fut = client.request(req);
                tokio::spawn(async move {
                    let res = fut.await;

                    tot_clone.fetch_add(1, Ordering::Relaxed);
                    match res {
                        Ok(r) => {
                            if r.status().is_success() {
                                suc_clone.fetch_add(1, Ordering::Relaxed);
                            } else {
                                fail_clone.fetch_add(1, Ordering::Relaxed);
                            }
                        }
                        Err(_e) => {
                            fail_clone.fetch_add(1, Ordering::Relaxed);
                        }
                    }
                    drop(permit);
                });
            }
            std::thread::sleep(send_delay);

            let prev_elasped = previos_time.elapsed();
            if prev_elasped.as_millis() > 100 {
                send_rate = self.scenario.rate(start.elapsed());
                let suc = num_successful.load(Ordering::Relaxed);
                let suc_delta: f64 = suc as f64 - previos_suc as f64;
                let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());

                let adjustment = 2. / (1. + (-(suc_rate as f64 - send_rate as f64) / 10000.).exp());

                send_delay =
                    Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);

                let fail = num_fail.load(Ordering::Relaxed);
                let fail_delta: f64 = fail as f64 - previos_fail as f64;
                let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());

                let test_time = start.elapsed().as_millis();

                if self.warmup && !has_warmed_up {
                    if (adjustment - 1.).abs() < 0.01 {
                        has_warmed_up = true;
                        start = Instant::now();
                    }
                } else {
                    let _ = self
                        .send_rate_channel
                        .0
                        .try_send((test_time as f32, suc_rate as f32));
                    let _ = self
                        .error_rate_channel
                        .0
                        .try_send((test_time as f32, fail_rate as f32));
                    let _ = self.send_num_packets_channel.0.try_send(suc as usize);
                }

                previos_suc = suc;
                previos_fail = fail;
                previos_time = Instant::now();
            }
        }

        // Wait for the last permits to get used
        let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
    }
}

async fn fire_other_thread(
    max_tokio_spawn: usize,
    test_duration: u128,
    num_packets_channel: Sender<usize>,
    packet_rate_channel: Sender<(usize, f32)>,
    error_rate_channel: Sender<(usize, f32)>,
    thread_id: usize,
    rconf: HttpConfig,
) {
    let client: Client<HttpConnector, Body> = Client::builder()
        .pool_idle_timeout(Duration::from_secs(30))
        .http2_only(true)
        .build_http();

    let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
    let num_successful = Arc::new(AtomicUsize::new(0));
    let num_sent = Arc::new(AtomicUsize::new(0));
    let num_fail = Arc::new(AtomicUsize::new(0));

    let mut previos_suc: usize = 0;
    let mut previos_fail: usize = 0;
    let mut previos_time = Instant::now();
    let start = Instant::now();

    let uri = Uri::try_from(rconf.url).unwrap();
    let method = Method::try_from(rconf.method.as_str()).unwrap();

    while start.elapsed().as_millis() < test_duration {
        for _ in 0..100 {
            let permit = semaphore.clone().acquire_owned().await.unwrap();
            let suc_clone = num_successful.clone();
            let tot_clone = num_sent.clone();
            let fail_clone = num_fail.clone();

            let req = Request::builder()
                .uri(uri.clone())
                .method(method.clone())
                .body(hyper::Body::from(rconf.body.clone()))
                .unwrap();
            let fut = client.request(req);
            tokio::spawn(async move {
                let res = fut.await;

                tot_clone.fetch_add(1, Ordering::Relaxed);
                match res {
                    Ok(r) => {
                        if r.status().is_success() {
                            suc_clone.fetch_add(1, Ordering::Relaxed);
                        } else {
                            fail_clone.fetch_add(1, Ordering::Relaxed);
                        }
                    }
                    Err(_e) => {
                        fail_clone.fetch_add(1, Ordering::Relaxed);
                    }
                }
                drop(permit);
            });
        }

        let prev_elasped = previos_time.elapsed();
        if prev_elasped.as_millis() > 100 {
            let suc = num_successful.load(Ordering::Relaxed);
            let suc_delta: f64 = suc as f64 - previos_suc as f64;
            let suc_rate = suc_delta as u128 * 1000000 / (prev_elasped.as_micros());

            let fail = num_fail.load(Ordering::Relaxed);
            let fail_delta: f64 = fail as f64 - previos_fail as f64;
            let fail_rate = fail_delta as u128 * 1000000 / (prev_elasped.as_micros());

            let _ = packet_rate_channel.try_send((thread_id, suc_rate as f32));
            let _ = error_rate_channel.try_send((thread_id, fail_rate as f32));
            let _ = num_packets_channel.try_send(suc_delta as usize);

            previos_suc = suc;
            previos_fail = fail;
            previos_time = Instant::now();
        }
    }

    // Wait for the last permits to get used
    let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
}