goku-core 2.0.1

Goku is an HTTP load testing application written in Rust
Documentation
use std::str::FromStr;

use anyhow::{Context, Result};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::Client;
use tokio::sync::mpsc::Sender;
use tokio::sync::watch::Receiver;
use tokio::time::Instant;

use crate::benchmark::BenchmarkResult;
use crate::settings::{Operation, Settings};

pub async fn run(
    settings: Settings,
    tx: Sender<BenchmarkResult>,
    rx_sigint: Option<Receiver<Option<()>>>,
) -> Result<()> {
    let mut clients = Vec::with_capacity(settings.clients as usize);
    for _ in 0..settings.clients {
        let client = Client::builder()
            .timeout(settings.timeout)
            .danger_accept_invalid_certs(true)
            .tcp_keepalive(settings.keep_alive)
            .build()
            .with_context(|| "Can not create http Client".to_string())?;
        clients.push(client);
    }
    for (id, client) in clients.into_iter().enumerate() {
        tokio::spawn(exec_iterator(
            id,
            settings.clone(),
            client,
            tx.clone(),
            rx_sigint.clone(),
        ));
    }
    Ok(())
}

async fn exec_iterator(
    num_client: usize,
    settings: Settings,
    client: Client,
    tx: Sender<BenchmarkResult>,
    mut rx_sigint: Option<Receiver<Option<()>>>,
) {
    match settings.duration {
        None => {
            by_iterations(num_client, &settings, &client, &tx, &mut rx_sigint).await;
        }
        Some(duration) => {
            by_time(num_client, &settings, &client, tx, &mut rx_sigint, duration).await;
        }
    }
}

async fn by_time(
    num_client: usize,
    settings: &Settings,
    client: &Client,
    tx: Sender<BenchmarkResult>,
    rx_sigint: &mut Option<Receiver<Option<()>>>,
    duration: u64,
) {
    let begin = Instant::now();
    let mut execution_number = 0;
    while begin.elapsed().as_secs() < duration {
        match rx_sigint {
            None => {
                let benchmark_result = exec(num_client, execution_number, client, settings);
                let _ = tx.send(benchmark_result.await).await;
                execution_number += 1;
            }
            Some(rx) => {
                let stop_signal = rx.changed();
                let benchmark_result = exec(num_client, execution_number, client, settings);
                let ack_send_result = tx.send(benchmark_result.await);
                execution_number += 1;
                match tokio::select! {
                _ = ack_send_result =>  None,
                _ = stop_signal => Some(())
                } {
                    None => {}
                    Some(_) => break,
                }
            }
        }
    }
}

async fn by_iterations(
    num_client: usize,
    settings: &Settings,
    client: &Client,
    tx: &Sender<BenchmarkResult>,
    rx_sigint: &mut Option<Receiver<Option<()>>>,
) {
    for execution_number in 0..settings.requests_by_client() {
        match rx_sigint {
            None => {
                let benchmark_result = exec(num_client, execution_number, client, settings).await;
                let _ = tx.send(benchmark_result).await;
            }
            Some(rx) => {
                let benchmark_result = exec(num_client, execution_number, client, settings).await;
                let stop_signal = rx.changed();
                let ack_send_result = tx.send(benchmark_result);

                match tokio::select! {
                    _ = ack_send_result =>  None,
                    _ = stop_signal => Some(())
                } {
                    None => {}
                    Some(_) => break,
                }
            }
        }
    }
}

async fn exec(
    num_client: usize,
    execution: i32,
    client: &Client,
    settings: &Settings,
) -> BenchmarkResult {
    let request_builder = match settings.operation() {
        Operation::Get => client.get(settings.target()),
        Operation::Post => client.post(settings.target()),
        Operation::Head => client.head(settings.target()),
        Operation::Patch => client.patch(settings.target()),
        Operation::Put => client.put(settings.target()),
        Operation::Delete => client.delete(settings.target()),
    };
    let headers_map: HeaderMap = match &settings.headers {
        None => HeaderMap::new(),
        Some(headers) => {
            let mut headers_map: HeaderMap = HeaderMap::new();
            headers.iter().for_each(|h| {
                let name = h.key.as_str();
                let value = h.value.as_str();

                let name = HeaderName::from_str(name).unwrap();
                let value = HeaderValue::from_str(value).unwrap();
                headers_map.insert(name, value);
            });
            headers_map
        }
    };
    let request_builder = match &settings.body {
        None => request_builder,
        Some(body) => request_builder.body(body.to_string()),
    };
    let request = request_builder.headers(headers_map);
    let begin = Instant::now();
    let response = request.send().await;
    let duration_ms = begin.elapsed().as_millis() as u64;
    match response {
        Ok(r) => BenchmarkResult {
            status: r.status().to_string(),
            duration: duration_ms,
            num_client,
            execution,
        },
        Err(e) => {
            let status = match e.status() {
                None => "Failed to connect".to_string(),
                Some(s) => s.to_string(),
            };
            BenchmarkResult {
                status,
                duration: duration_ms,
                num_client,
                execution,
            }
        }
    }
}