pdc-core 0.1.2

A network load testing library
Documentation
use crate::generator::TrafficGenerator;
use crate::scenario::Scenario;

use crossbeam_channel::{bounded, Receiver, Sender};
use reqwest;
use std::time::{Duration, Instant};
use std::{
    collections::BTreeMap,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};
use tokio;
use tokio::runtime::Runtime;
use tokio::sync::Semaphore;

/// This Generator is almost the same as the normal [`ReqwestGenerator`](struct@crate::generator::reqwest_generator::ReqwestGenerator), but it gives
/// out more of a "Fuzzy" rate
///
/// ![Fuzzy Graph](https://liam.warfiel.dev/Fuzzy.gif)
pub struct FuzzyReqwestGenerator<S: Scenario> {
    pub scenario: S,
    /// This is the maximum amount of TCP file descriptors **being used** at any one time.
    /// The actual number of open connections will be higher, because it takes
    /// time to purge old fd's.
    pub max_open_connections: usize,
    /// This controls the amount of worker threads that the backing tokio runtime
    /// uses. Setting this value to zero will create a default multithreaded runtime
    /// equivalent to tokio::runtime::Runtime::new()
    worker_count: usize,
    /// Let this generator warm up to the rate at t=0 before sending test data.
    pub warmup: bool,
    /// The request to send.
    pub req: reqwest::Request,
    /// Sends time series data on the send rate of the generator.
    send_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
    /// Sends the total number of successfull requests sent.
    send_num_packets_channel: (Sender<usize>, Receiver<usize>),
    /// Sends the error rate.
    error_rate_channel: (Sender<(f32, f32)>, Receiver<(f32, f32)>),
}

impl<T: Scenario> TrafficGenerator<T> for FuzzyReqwestGenerator<T> {
    fn run_scenario(&mut self) {
        let rt = self.create_runtime();
        rt.block_on(self.run_reqwests())
    }
    fn fire_hose(&mut self) {
        let mut suc_rates: BTreeMap<usize, f32> = BTreeMap::new();
        let mut error_rates: BTreeMap<usize, f32> = BTreeMap::new();
        let mut sent = 0;
        let core_ids = core_affinity::get_core_ids().unwrap();

        let num_cores = if self.worker_count == 0 {
            core_ids.len()
        } else {
            self.worker_count
        };

        let num_sent_channel: (Sender<usize>, Receiver<usize>) = bounded(num_cores * 2);
        // ThreadID, rate
        let rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) = bounded(num_cores * 2);
        let error_rate_channel: (Sender<(usize, f32)>, Receiver<(usize, f32)>) =
            bounded(num_cores * 2);
        for (i, thread_id) in core_ids.into_iter().enumerate() {
            let fut = fire_other_thread(
                self.max_open_connections / num_cores,
                self.scenario.duration(),
                self.req.try_clone().unwrap(),
                num_sent_channel.0.clone(),
                rate_channel.0.clone(),
                error_rate_channel.0.clone(),
                thread_id.id,
            );
            std::thread::spawn(move || {
                core_affinity::set_for_current(thread_id);
                let rt = tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .unwrap();
                rt.block_on(fut)
            });
            if i >= num_cores {
                break;
            }
        }

        let start = Instant::now();
        while start.elapsed() < Duration::from_millis(self.scenario.duration() as u64) {
            while let Ok(rate) = rate_channel.1.try_recv() {
                suc_rates.insert(rate.0, rate.1);
            }
            while let Ok(rate) = error_rate_channel.1.try_recv() {
                error_rates.insert(rate.0, rate.1);
            }
            while let Ok(s) = num_sent_channel.1.try_recv() {
                sent += s;
            }

            let _ = self.send_num_packets_channel.0.try_send(sent);
            let mut tot_suc_rate = 0.;
            for (_, r) in suc_rates.iter() {
                tot_suc_rate += r;
            }
            let _ = self
                .send_rate_channel
                .0
                .try_send((start.elapsed().as_millis() as f32, tot_suc_rate));

            let mut tot_fail_rate = 0.;
            for (_, r) in error_rates.iter() {
                tot_fail_rate += r;
            }
            let _ = self
                .error_rate_channel
                .0
                .try_send((start.elapsed().as_millis() as f32, tot_fail_rate));

            // This is needed to prevent resource contention over the channels
            std::thread::sleep(Duration::from_millis(50));
        }
    }

    fn set_scenario(&mut self, schem: T) {
        self.scenario = schem;
    }

    fn get_scenario(&self) -> &T {
        &self.scenario
    }

    fn send_packet(&mut self) {
        let client = reqwest::blocking::Client::new();
        let _ = client.get(self.req.url().clone()).send();
    }
    fn get_data_rate_channel(&self) -> Receiver<(f32, f32)> {
        self.send_rate_channel.1.clone()
    }

    fn get_sent_packets_channel(&self) -> Receiver<usize> {
        self.send_num_packets_channel.1.clone()
    }

    fn get_error_rate_channel(&self) -> Receiver<(f32, f32)> {
        self.error_rate_channel.1.clone()
    }
}

impl<S: Scenario> FuzzyReqwestGenerator<S> {
    pub fn new(
        scenario: S,
        max_open_connections: usize,
        worker_count: usize,
        warmup: bool,
        req: reqwest::Request,
    ) -> Self {
        FuzzyReqwestGenerator {
            scenario,
            max_open_connections,
            req,
            warmup,
            worker_count,
            send_rate_channel: bounded(10),
            send_num_packets_channel: bounded(10),
            error_rate_channel: bounded(10),
        }
    }

    fn create_runtime(&self) -> Runtime {
        if self.worker_count == 0 {
            tokio::runtime::Runtime::new().unwrap()
        } else {
            tokio::runtime::Builder::new_multi_thread()
                .worker_threads(self.worker_count)
                .enable_all()
                .build()
                .unwrap()
        }
    }

    async fn run_reqwests(&mut self) {
        let max_tokio_spawn: usize = self.max_open_connections;
        let mut send_rate: f32 = self.scenario.rate(Duration::from_millis(0));
        let client = reqwest::Client::new();
        let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
        let num_successful = Arc::new(AtomicUsize::new(0));
        let num_error = Arc::new(AtomicUsize::new(0));
        let mut send_delay = Duration::from_micros(1000000 / send_rate as u64);
        let mut previos_count: usize = 0;
        let mut previos_error: usize = 0;
        let mut previos_time = Instant::now();
        let mut start = Instant::now();
        let mut elasped = start.elapsed();
        let test_duration = self.scenario.duration();
        let mut has_warmed_up = false;
        let mut flag = false;

        while elasped.as_millis() < test_duration {
            for _ in 0..10 {
                let permit = semaphore.clone().acquire_owned().await.unwrap();
                let ns = num_successful.clone();
                let ne = num_error.clone();
                let fut = client.execute(self.req.try_clone().unwrap());
                tokio::spawn(async move {
                    let res = fut.await;
                    drop(permit);
                    match res {
                        Ok(r) => {
                            if r.status().is_success() {
                                ns.fetch_add(1, Ordering::Relaxed);
                            } else {
                                ne.fetch_add(1, Ordering::Relaxed);
                            }
                        }
                        _ => {
                            ne.fetch_add(1, Ordering::Relaxed);
                        }
                    }
                });
            }
            std::thread::sleep(send_delay);

            let prev_elasped = previos_time.elapsed();
            if prev_elasped.as_millis() > 100 {
                elasped = start.elapsed();
                let suc = num_successful.load(Ordering::Relaxed);
                let error = num_error.load(Ordering::Relaxed);

                let delta: f64 = suc as f64 - previos_count as f64;
                let suc_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());

                let delta: f64 = error as f64 - previos_error as f64;
                let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());

                send_rate = self.scenario.rate(elasped);

                if flag && has_warmed_up {
                    send_rate /= 2.;
                }
                flag = !flag;

                previos_time = Instant::now();
                previos_count = suc;
                previos_error = error;

                let rate_delta = suc_rate as f64 - send_rate as f64;

                // Logisticall Tune rate to target
                // https://en.wikipedia.org/wiki/Logistic_function
                let adjustment = 2. / (1. + (-(rate_delta as f64) / 10000.).exp());

                send_delay =
                    Duration::from_nanos((send_delay.as_nanos() as f64 * adjustment) as u64);

                if self.warmup && !has_warmed_up {
                    if (adjustment - 1.).abs() < 0.01 {
                        has_warmed_up = true;
                        start = Instant::now();
                    }
                } else {
                    let _ = self
                        .send_rate_channel
                        .0
                        .try_send((elasped.as_millis() as f32, suc_rate as f32));
                    let _ = self.send_num_packets_channel.0.try_send(suc);
                    let _ = self
                        .error_rate_channel
                        .0
                        .try_send((elasped.as_millis() as f32, error_rate as f32));
                }
            }
        }

        // Wait for the last permits to get used
        let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
    }
}

async fn fire_other_thread(
    max_tokio_spawn: usize,
    test_duration: u128,
    request: reqwest::Request,
    num_packets_channel: Sender<usize>,
    packet_rate_channel: Sender<(usize, f32)>,
    error_rate_channel: Sender<(usize, f32)>,
    thread_id: usize,
) {
    let client = reqwest::Client::new();
    let semaphore = Arc::new(Semaphore::new(max_tokio_spawn as usize));
    let num_successful = Arc::new(AtomicUsize::new(0));
    let num_error = Arc::new(AtomicUsize::new(0));

    let mut previos_count: usize = 0;
    let mut previos_error: usize = 0;
    let mut previos_time = Instant::now();
    let start = Instant::now();

    while start.elapsed().as_millis() < test_duration {
        for _ in 0..10 {
            let permit = semaphore.clone().acquire_owned().await.unwrap();
            let ns = num_successful.clone();
            let ne = num_error.clone();
            let fut = client.execute(request.try_clone().unwrap());
            tokio::spawn(async move {
                let res = fut.await;
                drop(permit);
                match res {
                    Ok(r) => {
                        if r.status().is_success() {
                            ns.fetch_add(1, Ordering::Relaxed);
                        } else {
                            ne.fetch_add(1, Ordering::Relaxed);
                        }
                    }
                    _ => {
                        ne.fetch_add(1, Ordering::Relaxed);
                    }
                }
            });
        }

        let prev_elasped = previos_time.elapsed();
        if prev_elasped.as_millis() > 100 {
            let suc = num_successful.fetch_add(0, Ordering::Relaxed);
            let error = num_error.load(Ordering::Relaxed);

            let suc_delta: f64 = suc as f64 - previos_count as f64;
            let rate = suc_delta as u128 * 1000000 / (previos_time.elapsed().as_micros());

            let delta: f64 = error as f64 - previos_error as f64;
            let error_rate = delta as u128 * 1000000 / (prev_elasped.as_micros());

            previos_time = Instant::now();
            previos_count = suc;
            previos_error = error;

            let _ = packet_rate_channel.try_send((thread_id, rate as f32));
            let _ = num_packets_channel.try_send(suc_delta as usize);
            let _ = error_rate_channel.try_send((thread_id, error_rate as f32));
        }
    }

    // Wait for the last permits to get used
    let _ = semaphore.acquire_many(max_tokio_spawn as u32).await;
}