crows 0.3.0

CLI for using the Crows stress testing tool
Documentation
use crows_utils::services::{CoordinatorClient, IterationInfo, RequestInfo, RunId, RunInfo};
use crows_utils::{process_info_handle, InfoHandle};

use std::collections::HashMap;
use std::io::Stdout;
use std::io::{stdout, Write};
use std::time::Duration;

use anyhow::anyhow;
use crossterm::{
    cursor::{self, MoveTo, MoveToNextLine, MoveUp},
    execute,
    style::Print,
    terminal::{self, Clear, ClearType, ScrollUp},
};

#[derive(Default)]
pub struct SummaryStats {
    pub avg: Duration,
    pub min: Duration,
    pub med: Duration,
    pub max: Duration,
    pub p90: Duration,
    pub p95: Duration,
    pub fail_rate: f64,
    pub success_count: usize,
    pub fail_count: usize,
    pub total: usize,
}

#[derive(Default)]
pub struct WorkerState {
    pub active_instances: isize,
    pub capacity: isize,
    pub done: bool,
}

#[derive(Default)]
pub struct BarData {
    pub worker_name: String,
    pub active_vus: usize,
    pub all_vus: usize,
    pub duration: Duration,
    pub left: Duration,
    pub done: bool,
}

pub trait LatencyInfo {
    fn latency(&self) -> f64;
    fn successful(&self) -> bool;
}

impl LatencyInfo for RequestInfo {
    fn latency(&self) -> f64 {
        self.latency.as_secs_f64()
    }

    fn successful(&self) -> bool {
        self.successful
    }
}

impl LatencyInfo for IterationInfo {
    fn latency(&self) -> f64 {
        self.latency.as_secs_f64()
    }

    fn successful(&self) -> bool {
        true
    }
}

pub fn print(
    stdout: &mut Stdout,
    progress_lines: u16,
    lines: Vec<String>,
    bars: &HashMap<String, BarData>,
    last: bool,
) -> anyhow::Result<()> {
    let (_, height) = terminal::size()?;

    execute!(
        stdout,
        MoveTo(0, height - progress_lines as u16),
        Clear(ClearType::FromCursorDown),
        MoveUp(1),
    )?;

    for line in lines {
        let (_, y) = cursor::position()?;
        execute!(stdout, Print(line))?;
        let (_, new_y) = cursor::position()?;
        let n = new_y - y;
        execute!(stdout, ScrollUp(n), MoveUp(n))?;
    }

    execute!(
        stdout,
        MoveTo(0, height - progress_lines as u16 + 1),
        Clear(ClearType::FromCursorDown),
        MoveUp(1),
    )?;

    for (_, bar) in bars {
        if bar.done {
            execute!(
                stdout,
                Print(format!("{}: Done", bar.worker_name,)),
                MoveToNextLine(1),
            )?;
        } else {
            let progress_percentage = bar.duration.as_secs_f64()
                / (bar.duration.as_secs_f64() + bar.left.as_secs_f64())
                * 100 as f64;
            execute!(
                stdout,
                Print(format!(
                    "{}: [{: <25}] {:.2}% ({}/{})",
                    bar.worker_name,
                    "*".repeat((progress_percentage as usize) / 4),
                    progress_percentage,
                    bar.active_vus,
                    bar.all_vus,
                )),
                MoveToNextLine(1),
            )?;
        }
    }
    if last {
        execute!(stdout, Print("\n"),)?;
    }

    stdout.flush()?;

    Ok(())
}

pub fn format_duration(duration: Duration) -> String {
    let secs = duration.as_secs();
    let total_millis = duration.as_millis();
    let total_micros = duration.as_micros();
    let nanos = duration.subsec_nanos();

    if secs > 0 {
        format!("{:.2}s", secs as f64 + nanos as f64 / 1_000_000_000.0)
    } else if total_millis > 0 {
        format!(
            "{:.2}ms",
            total_millis as f64 + (nanos % 1_000_000) as f64 / 1_000_000.0
        )
    } else if total_micros > 0 {
        format!(
            "{:.2}µs",
            total_micros as f64 + (nanos % 1_000) as f64 / 1_000.0
        )
    } else {
        format!("{}ns", nanos)
    }
}

fn calculate_avg(latencies: &[f64]) -> f64 {
    latencies.iter().sum::<f64>() / latencies.len() as f64
}

fn calculate_min(latencies: &[f64]) -> f64 {
    *latencies
        .iter()
        .min_by(|a, b| a.partial_cmp(b).unwrap())
        .unwrap()
}

fn calculate_max(latencies: &[f64]) -> f64 {
    *latencies
        .iter()
        .max_by(|a, b| a.partial_cmp(b).unwrap())
        .unwrap()
}

fn calculate_percentile(latencies: &[f64], percentile: f64) -> f64 {
    let idx = (percentile / 100.0 * latencies.len() as f64).ceil() as usize - 1;
    latencies[idx]
}

fn calculate_median(latencies: &[f64]) -> f64 {
    let mid = latencies.len() / 2;
    if latencies.len() % 2 == 0 {
        (latencies[mid - 1] + latencies[mid]) / 2.0
    } else {
        latencies[mid]
    }
}

pub fn calculate_summary<T>(latencies: &Vec<T>) -> SummaryStats
where
    T: LatencyInfo,
{
    let mut latencies_sorted: Vec<f64> = latencies.iter().map(|l| l.latency()).collect();
    latencies_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());

    let fail_count = latencies.iter().filter(|l| !l.successful()).count();
    let success_count = latencies.iter().filter(|l| l.successful()).count();
    let fail_rate = fail_count as f64 / latencies.len() as f64 * 100f64;

    SummaryStats {
        avg: Duration::from_secs_f64(calculate_avg(&latencies_sorted)),
        min: Duration::from_secs_f64(calculate_min(&latencies_sorted)),
        max: Duration::from_secs_f64(calculate_max(&latencies_sorted)),
        med: Duration::from_secs_f64(calculate_median(&latencies_sorted)),
        p90: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 90.0)),
        p95: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 95.0)),
        total: latencies.len(),
        fail_rate,
        success_count,
        fail_count,
    }
}

pub trait ProgressFetcher {
    #[allow(async_fn_in_trait)]
    async fn get_run_status(
        &mut self,
        id: RunId,
    ) -> anyhow::Result<Option<HashMap<String, RunInfo>>>;
}

impl ProgressFetcher for CoordinatorClient {
    async fn get_run_status(
        &mut self,
        id: RunId,
    ) -> anyhow::Result<Option<HashMap<String, RunInfo>>> {
        CoordinatorClient::get_run_status(self, id).await
    }
}

pub struct LocalProgressFetcher {
    info_handle: InfoHandle,
    worker_name: String,
}

impl LocalProgressFetcher {
    pub fn new(info_handle: InfoHandle, worker_name: String) -> Self {
        Self {
            info_handle,
            worker_name,
        }
    }
}

impl ProgressFetcher for LocalProgressFetcher {
    async fn get_run_status(
        &mut self,
        _: RunId,
    ) -> anyhow::Result<Option<HashMap<String, RunInfo>>> {
        let run_info = process_info_handle(&mut self.info_handle).await;
        Ok(Some(vec![(self.worker_name.clone(), run_info)].into_iter().collect()))
    }
}

pub async fn drive_progress<T>(
    client: &mut T,
    run_id: &RunId,
    worker_names: Vec<String>,
) -> anyhow::Result<()>
where
    T: ProgressFetcher,
{
    let mut stdout = stdout();

    let progress_lines = worker_names.len() as u16;

    let mut all_request_stats: Vec<RequestInfo> = Vec::new();
    let mut all_iteration_stats: Vec<IterationInfo> = Vec::new();
    let mut worker_states: HashMap<String, WorkerState> = HashMap::new();
    let mut bars = HashMap::new();

    for name in worker_names {
        worker_states.insert(name.clone(), Default::default());

        bars.insert(
            name.clone(),
            BarData {
                worker_name: name.clone(),
                left: Duration::from_secs(1),
                ..Default::default()
            },
        );
    }

    loop {
        let mut lines = Vec::new();
        let result = client.get_run_status(run_id.clone()).await.unwrap();

        if worker_states.values().all(|s| s.done) {
            break;
        }

        for (worker_name, run_info) in result.unwrap().iter() {
            let state = worker_states
                .get_mut(worker_name)
                .ok_or(anyhow!("Couldn't findt the worker"))?;
            state.active_instances += run_info.active_instances_delta;
            state.capacity += run_info.capacity_delta;

            all_request_stats.extend(run_info.request_stats.clone());
            all_iteration_stats.extend(run_info.iteration_stats.clone());

            for log_line in &run_info.stdout {
                lines.push(format!(
                    "[INFO][{worker_name}] {}",
                    String::from_utf8_lossy(log_line)
                ));
            }
            for log_line in &run_info.stderr {
                lines.push(format!(
                    "[ERROR][{worker_name}] {}",
                    String::from_utf8_lossy(log_line)
                ));
            }

            if run_info.done {
                state.done = true;
            }

            let bar = bars
                .get_mut(worker_name)
                .ok_or(anyhow!("Couldn't find bar data for worker {worker_name}"))?;
            bar.active_vus = state.active_instances as usize;
            bar.all_vus = state.capacity as usize;
            if let Some(duration) = run_info.elapsed {
                bar.duration = duration;
            }
            if let Some(left) = run_info.left {
                bar.left = left;
            }
            bar.done = state.done;
        }

        print(&mut stdout, progress_lines, lines, &bars, false).unwrap();
        tokio::time::sleep(Duration::from_millis(250)).await;
    }

    let request_summary = calculate_summary(&all_request_stats);
    let iteration_summary = calculate_summary(&all_iteration_stats);

    let mut lines = Vec::new();
    lines.push(format!("\n\nSummary:\n"));
    lines.push(format!(
        "http_req_duration..........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n",
        format_duration(request_summary.avg),
        format_duration(request_summary.min),
        format_duration(request_summary.med),
        format_duration(request_summary.max),
        format_duration(request_summary.p90),
        format_duration(request_summary.p95)
    ));
    lines.push(format!(
        "http_req_failed............: {:.2}%\t{}\t{}\n",
        request_summary.fail_rate, request_summary.success_count, request_summary.fail_count
    ));
    lines.push(format!(
        "http_reqs..................: {}\n",
        request_summary.total
    ));
    lines.push(format!(
        "iteration_duration.........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n",
        format_duration(iteration_summary.avg),
        format_duration(iteration_summary.min),
        format_duration(iteration_summary.med),
        format_duration(iteration_summary.max),
        format_duration(iteration_summary.p90),
        format_duration(iteration_summary.p95)
    ));
    lines.push(format!(
        "iterations.................: {}\n",
        iteration_summary.total
    ));
    lines.push(format!("\n\n"));

    print(&mut stdout, progress_lines, lines, &bars, true)?;

    Ok(())
}