strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::collections::BTreeMap;
use std::path::PathBuf;

use tokio::io::BufReader;

use crate::error::{AppError, AppResult, MetricsError};
use crate::metrics;

use super::parsing::{
    HeapItem, LogCursor, PercentileSeries, ensure_len, inc_slot, read_next_record,
};

pub(super) async fn load_chart_data_streaming(
    paths: &[PathBuf],
    expected_status_code: u16,
    metrics_range: &Option<metrics::MetricsRange>,
    latency_bucket_ms: u64,
) -> AppResult<metrics::StreamingChartData> {
    let mut cursors: Vec<LogCursor> = Vec::with_capacity(paths.len());
    for path in paths {
        let file = tokio::fs::File::open(path).await.map_err(|err| {
            AppError::metrics(MetricsError::Io {
                context: "open metrics log",
                source: err,
            })
        })?;
        cursors.push(LogCursor {
            reader: BufReader::new(file),
            line: String::new(),
        });
    }

    let mut heap: std::collections::BinaryHeap<std::cmp::Reverse<HeapItem>> =
        std::collections::BinaryHeap::new();

    for (idx, cursor) in cursors.iter_mut().enumerate() {
        if let Some(record) = read_next_record(cursor).await? {
            heap.push(std::cmp::Reverse(HeapItem {
                elapsed_ms: record.elapsed_ms,
                idx,
                record,
            }));
        }
    }

    let mut avg_buckets: BTreeMap<u64, (u128, u64)> = BTreeMap::new();
    let mut total_buckets: BTreeMap<u64, u64> = BTreeMap::new();
    let mut success_buckets: BTreeMap<u64, u64> = BTreeMap::new();
    let mut error_buckets: BTreeMap<u64, u64> = BTreeMap::new();
    let mut rps_counts: Vec<u32> = Vec::new();
    let mut timeouts: Vec<u32> = Vec::new();
    let mut transports: Vec<u32> = Vec::new();
    let mut non_expected: Vec<u32> = Vec::new();
    let mut status_2xx: Vec<u32> = Vec::new();
    let mut status_3xx: Vec<u32> = Vec::new();
    let mut status_4xx: Vec<u32> = Vec::new();
    let mut status_5xx: Vec<u32> = Vec::new();
    let mut status_other: Vec<u32> = Vec::new();
    let mut inflight_deltas: Vec<i64> = Vec::new();

    let mut latency_buckets_ms: Vec<u64> = Vec::new();
    let mut p50: Vec<u64> = Vec::new();
    let mut p90: Vec<u64> = Vec::new();
    let mut p99: Vec<u64> = Vec::new();
    let mut p50_ok: Vec<u64> = Vec::new();
    let mut p90_ok: Vec<u64> = Vec::new();
    let mut p99_ok: Vec<u64> = Vec::new();

    let bucket_ms = latency_bucket_ms.max(1);
    let mut current_bucket: Option<u64> = None;
    let mut latencies: Vec<u64> = Vec::new();
    let mut latencies_ok: Vec<u64> = Vec::new();
    let mut series = PercentileSeries {
        latency_seconds: &mut latency_buckets_ms,
        p50: &mut p50,
        p90: &mut p90,
        p99: &mut p99,
        p50_ok: &mut p50_ok,
        p90_ok: &mut p90_ok,
        p99_ok: &mut p99_ok,
    };

    while let Some(std::cmp::Reverse(item)) = heap.pop() {
        let record = item.record;
        let sec = record.elapsed_ms / 1000;
        let bucket = record.elapsed_ms.checked_div(bucket_ms).unwrap_or(0);

        if let Some(metrics::MetricsRange(range)) = metrics_range.as_ref()
            && !range.contains(&sec)
        {
            if let Some(cursor) = cursors.get_mut(item.idx)
                && let Some(next) = read_next_record(cursor).await?
            {
                heap.push(std::cmp::Reverse(HeapItem {
                    elapsed_ms: next.elapsed_ms,
                    idx: item.idx,
                    record: next,
                }));
            }
            continue;
        }

        match current_bucket {
            Some(active) if bucket != active => {
                series.push_percentiles_for_sec(
                    active.saturating_mul(bucket_ms),
                    &mut latencies,
                    &mut latencies_ok,
                );
                current_bucket = Some(bucket);
            }
            None => current_bucket = Some(bucket),
            _ => {}
        }

        let bucket_100ms = record.elapsed_ms / 100;
        let entry = avg_buckets.entry(bucket_100ms).or_insert((0, 0));
        entry.0 = entry.0.saturating_add(u128::from(record.latency_ms));
        entry.1 = entry.1.saturating_add(1);

        let total_entry = total_buckets.entry(bucket_100ms).or_insert(0);
        *total_entry = total_entry.saturating_add(1);

        if record.status_code == expected_status_code {
            let success_entry = success_buckets.entry(bucket_100ms).or_insert(0);
            *success_entry = success_entry.saturating_add(1);
        }
        if record.status_code != expected_status_code {
            let error_entry = error_buckets.entry(bucket_100ms).or_insert(0);
            *error_entry = error_entry.saturating_add(1);
        }

        let sec_idx = usize::try_from(sec).unwrap_or(usize::MAX);
        let sec_len = sec_idx.saturating_add(1);
        ensure_len(&mut rps_counts, sec_len);
        inc_slot(&mut rps_counts, sec_idx);

        ensure_len(&mut timeouts, sec_len);
        ensure_len(&mut transports, sec_len);
        ensure_len(&mut non_expected, sec_len);

        if record.timed_out {
            inc_slot(&mut timeouts, sec_idx);
        } else if record.transport_error {
            inc_slot(&mut transports, sec_idx);
        } else if record.status_code != expected_status_code {
            inc_slot(&mut non_expected, sec_idx);
        }

        ensure_len(&mut status_2xx, sec_len);
        ensure_len(&mut status_3xx, sec_len);
        ensure_len(&mut status_4xx, sec_len);
        ensure_len(&mut status_5xx, sec_len);
        ensure_len(&mut status_other, sec_len);

        match record.status_code {
            200..=299 => inc_slot(&mut status_2xx, sec_idx),
            300..=399 => inc_slot(&mut status_3xx, sec_idx),
            400..=499 => inc_slot(&mut status_4xx, sec_idx),
            500..=599 => inc_slot(&mut status_5xx, sec_idx),
            _ => inc_slot(&mut status_other, sec_idx),
        }

        let start_sec = sec_idx;
        let end_total_ms = record.elapsed_ms.saturating_add(record.latency_ms);
        let end_sec = usize::try_from(end_total_ms / 1000).unwrap_or(usize::MAX);
        let end_idx = end_sec.saturating_add(1);
        if inflight_deltas.len() <= end_idx {
            inflight_deltas.resize(end_idx.saturating_add(1), 0);
        }
        if let Some(slot) = inflight_deltas.get_mut(start_sec) {
            *slot = slot.saturating_add(1);
        }
        if let Some(slot) = inflight_deltas.get_mut(end_idx) {
            *slot = slot.saturating_sub(1);
        }

        latencies.push(record.latency_ms);
        if record.status_code == expected_status_code
            && !record.timed_out
            && !record.transport_error
        {
            latencies_ok.push(record.latency_ms);
        }

        if let Some(cursor) = cursors.get_mut(item.idx)
            && let Some(next) = read_next_record(cursor).await?
        {
            heap.push(std::cmp::Reverse(HeapItem {
                elapsed_ms: next.elapsed_ms,
                idx: item.idx,
                record: next,
            }));
        }
    }

    if let Some(active) = current_bucket {
        series.push_percentiles_for_sec(
            active.saturating_mul(bucket_ms),
            &mut latencies,
            &mut latencies_ok,
        );
    }

    let mut inflight: Vec<u32> = Vec::with_capacity(inflight_deltas.len());
    let mut current: i64 = 0;
    for delta in inflight_deltas {
        current = current.saturating_add(delta);
        inflight.push(u32::try_from(current.max(0)).unwrap_or(u32::MAX));
    }

    Ok(metrics::StreamingChartData {
        avg_buckets,
        total_buckets,
        success_buckets,
        error_buckets,
        rps_counts,
        timeouts,
        transports,
        non_expected,
        status_2xx,
        status_3xx,
        status_4xx,
        status_5xx,
        status_other,
        inflight,
        latency_buckets_ms,
        latency_bucket_ms: bucket_ms,
        p50,
        p90,
        p99,
        p50_ok,
        p90_ok,
        p99_ok,
    })
}