strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
#[cfg(any(test, feature = "fuzzing"))]
use std::path::Path;
#[cfg(any(test, feature = "fuzzing"))]
use std::time::Duration;

#[cfg(any(test, feature = "fuzzing"))]
use tokio::fs::File;
#[cfg(any(test, feature = "fuzzing"))]
use tokio::io::{AsyncBufReadExt, BufReader};

#[cfg(any(test, feature = "fuzzing"))]
use crate::error::{AppError, AppResult, MetricsError};

#[cfg(any(test, feature = "fuzzing"))]
use super::super::{LatencyHistogram, MetricRecord, MetricsRange, MetricsSummary};
#[cfg(any(test, feature = "fuzzing"))]
use super::LogResult;

#[cfg(any(test, feature = "fuzzing"))]
/// Read metrics from a log file and summarize them.
///
/// # Errors
///
/// Returns an error if the log cannot be read or parsed, or if histogram
/// operations fail.
pub async fn read_metrics_log(
    log_path: &Path,
    expected_status_code: u16,
    metrics_range: &Option<MetricsRange>,
    metrics_max: usize,
    warmup: Option<Duration>,
) -> AppResult<LogResult> {
    let warmup_ms = warmup
        .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
        .unwrap_or(0);
    let file = File::open(log_path).await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "open metrics log",
            source: err,
        })
    })?;
    let mut reader = BufReader::new(file);
    let mut line = String::new();
    let mut records = Vec::new();
    let mut metrics_truncated = false;
    let collect_records = metrics_max > 0;
    let mut histogram = LatencyHistogram::new()?;
    let mut success_histogram = LatencyHistogram::new()?;

    let mut total_requests: u64 = 0;
    let mut successful_requests: u64 = 0;
    let mut timeout_requests: u64 = 0;
    let mut latency_sum_ms: u128 = 0;
    let mut success_latency_sum_ms: u128 = 0;
    let mut min_latency_ms: u64 = u64::MAX;
    let mut max_latency_ms: u64 = 0;
    let mut success_min_latency_ms: u64 = u64::MAX;
    let mut success_max_latency_ms: u64 = 0;
    let mut transport_errors: u64 = 0;
    let mut non_expected_status: u64 = 0;
    let mut max_elapsed_ms: u64 = 0;

    loop {
        line.clear();
        let bytes = reader.read_line(&mut line).await.map_err(|err| {
            AppError::metrics(MetricsError::Io {
                context: "read metrics log",
                source: err,
            })
        })?;
        if bytes == 0 {
            break;
        }

        let trimmed = line.trim_end();
        let mut parts = trimmed.split(',');
        let elapsed_ms_raw = match parts.next().and_then(|value| value.parse::<u64>().ok()) {
            Some(value) => value,
            None => continue,
        };
        if elapsed_ms_raw < warmup_ms {
            continue;
        }
        let elapsed_ms = elapsed_ms_raw.saturating_sub(warmup_ms);
        let latency_ms = match parts.next().and_then(|value| value.parse::<u64>().ok()) {
            Some(value) => value,
            None => continue,
        };
        let status_code = match parts.next().and_then(|value| value.parse::<u16>().ok()) {
            Some(value) => value,
            None => continue,
        };
        let timed_out = parts
            .next()
            .and_then(|value| value.parse::<u8>().ok())
            .is_some_and(|value| value != 0);
        let transport_error = parts
            .next()
            .and_then(|value| value.parse::<u8>().ok())
            .is_some_and(|value| value != 0);
        let response_bytes = parts
            .next()
            .and_then(|value| value.parse::<u64>().ok())
            .unwrap_or(0);
        let in_flight_ops = parts
            .next()
            .and_then(|value| value.parse::<u64>().ok())
            .unwrap_or(0);

        total_requests = total_requests.saturating_add(1);
        if status_code == expected_status_code && !timed_out && !transport_error {
            successful_requests = successful_requests.saturating_add(1);
            success_latency_sum_ms = success_latency_sum_ms.saturating_add(u128::from(latency_ms));
            if latency_ms < success_min_latency_ms {
                success_min_latency_ms = latency_ms;
            }
            if latency_ms > success_max_latency_ms {
                success_max_latency_ms = latency_ms;
            }
            success_histogram.record(latency_ms)?;
        }
        if timed_out {
            timeout_requests = timeout_requests.saturating_add(1);
        } else if transport_error {
            transport_errors = transport_errors.saturating_add(1);
        } else if status_code != expected_status_code {
            non_expected_status = non_expected_status.saturating_add(1);
        }
        latency_sum_ms = latency_sum_ms.saturating_add(u128::from(latency_ms));
        if latency_ms < min_latency_ms {
            min_latency_ms = latency_ms;
        }
        if latency_ms > max_latency_ms {
            max_latency_ms = latency_ms;
        }
        if elapsed_ms > max_elapsed_ms {
            max_elapsed_ms = elapsed_ms;
        }
        histogram.record(latency_ms)?;

        if collect_records {
            let seconds_elapsed = elapsed_ms / 1000;
            let in_range = match metrics_range {
                Some(MetricsRange(range)) => range.contains(&seconds_elapsed),
                None => true,
            };

            if in_range {
                if records.len() < metrics_max {
                    records.push(MetricRecord {
                        elapsed_ms,
                        latency_ms,
                        status_code,
                        timed_out,
                        transport_error,
                        response_bytes,
                        in_flight_ops,
                    });
                } else {
                    metrics_truncated = true;
                }
            }
        }
    }

    let duration = Duration::from_millis(max_elapsed_ms);
    let avg_latency_ms = if total_requests > 0 {
        let avg = latency_sum_ms
            .checked_div(u128::from(total_requests))
            .unwrap_or(0);
        u64::try_from(avg).map_or(u64::MAX, |value| value)
    } else {
        0
    };
    let min_latency_ms = if total_requests > 0 {
        min_latency_ms
    } else {
        0
    };
    let success_avg_latency_ms = if successful_requests > 0 {
        let avg = success_latency_sum_ms
            .checked_div(u128::from(successful_requests))
            .unwrap_or(0);
        u64::try_from(avg).map_or(u64::MAX, |value| value)
    } else {
        0
    };
    let success_min_latency_ms = if successful_requests > 0 {
        success_min_latency_ms
    } else {
        0
    };
    let success_max_latency_ms = if successful_requests > 0 {
        success_max_latency_ms
    } else {
        0
    };
    let error_requests = total_requests.saturating_sub(successful_requests);

    Ok(LogResult {
        records,
        summary: MetricsSummary {
            duration,
            total_requests,
            successful_requests,
            error_requests,
            timeout_requests,
            transport_errors,
            non_expected_status,
            min_latency_ms,
            max_latency_ms,
            avg_latency_ms,
            success_min_latency_ms,
            success_max_latency_ms,
            success_avg_latency_ms,
        },
        metrics_truncated,
        latency_sum_ms,
        success_latency_sum_ms,
        histogram,
        success_histogram,
    })
}