strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::path::Path;

use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, BufReader};

use crate::args::TesterArgs;
use crate::error::{AppError, AppResult, MetricsError, ValidationError};
use crate::metrics::MetricRecord;

pub(crate) async fn read_records_from_path(path: &Path) -> AppResult<Vec<MetricRecord>> {
    let extension = path
        .extension()
        .and_then(|value| value.to_str())
        .unwrap_or("");
    match extension.to_ascii_lowercase().as_str() {
        "csv" => read_csv_records(path).await,
        "json" => read_json_records(path).await,
        "jsonl" | "ndjson" => read_jsonl_records(path).await,
        _ => Err(AppError::validation(
            ValidationError::InvalidSnapshotFormat {
                value: path.to_string_lossy().into_owned(),
            },
        )),
    }
}

pub(super) async fn load_replay_records(args: &TesterArgs) -> AppResult<Vec<MetricRecord>> {
    let export_sources = [
        args.export_csv.as_ref(),
        args.export_json.as_ref(),
        args.export_jsonl.as_ref(),
    ]
    .into_iter()
    .filter(|value| value.is_some())
    .count();
    if export_sources > 1 {
        return Err(AppError::validation(
            ValidationError::ReplayExportSourceConflict,
        ));
    }
    if let Some(path) = args.export_csv.as_deref() {
        return read_csv_records(Path::new(path)).await;
    }
    if let Some(path) = args.export_json.as_deref() {
        return read_json_records(Path::new(path)).await;
    }
    if let Some(path) = args.export_jsonl.as_deref() {
        return read_jsonl_records(Path::new(path)).await;
    }
    read_tmp_records(Path::new(&args.tmp_path)).await
}

pub(super) async fn read_tmp_records(path: &Path) -> AppResult<Vec<MetricRecord>> {
    let metadata = tokio::fs::metadata(path).await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "stat tmp path",
            source: err,
        })
    })?;
    if metadata.is_file() {
        return read_csv_records(path).await;
    }
    if !metadata.is_dir() {
        return Err(AppError::metrics(MetricsError::ReplayTmpPathInvalid));
    }

    let mut entries = tokio::fs::read_dir(path).await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "read tmp directory",
            source: err,
        })
    })?;
    let mut records = Vec::new();
    let mut found = false;
    while let Some(entry) = entries.next_entry().await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "read tmp entry",
            source: err,
        })
    })? {
        let file_name = entry.file_name().to_string_lossy().to_string();
        let entry_path = entry.path();
        if !file_name.starts_with("metrics-") || !file_name.ends_with(".log") {
            continue;
        }
        found = true;
        let mut file_records = read_csv_records(&entry_path).await?;
        records.append(&mut file_records);
    }
    if !found {
        return Err(AppError::metrics(MetricsError::ReplayTmpNoLogs));
    }
    Ok(records)
}

pub(super) async fn read_csv_records(path: &Path) -> AppResult<Vec<MetricRecord>> {
    let file = tokio::fs::File::open(path).await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "open replay file",
            source: err,
        })
    })?;
    let mut reader = BufReader::new(file);
    let mut line = String::new();
    let mut records = Vec::new();
    let mut saw_header = false;

    loop {
        line.clear();
        let bytes = reader.read_line(&mut line).await.map_err(|err| {
            AppError::metrics(MetricsError::Io {
                context: "read replay file",
                source: err,
            })
        })?;
        if bytes == 0 {
            break;
        }
        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }
        if !saw_header && trimmed.starts_with("elapsed_ms") {
            saw_header = true;
            continue;
        }
        saw_header = true;
        let mut parts = trimmed.split(',');
        let elapsed_ms = match parts.next().and_then(|value| value.parse::<u64>().ok()) {
            Some(value) => value,
            None => continue,
        };
        let latency_ms = match parts.next().and_then(|value| value.parse::<u64>().ok()) {
            Some(value) => value,
            None => continue,
        };
        let status_code = match parts.next().and_then(|value| value.parse::<u16>().ok()) {
            Some(value) => value,
            None => continue,
        };
        let timed_out = parts.next().map(parse_bool).unwrap_or(false);
        let transport_error = parts.next().map(parse_bool).unwrap_or(false);
        let response_bytes = parts
            .next()
            .and_then(|value| value.parse::<u64>().ok())
            .unwrap_or(0);
        let in_flight_ops = parts
            .next()
            .and_then(|value| value.parse::<u64>().ok())
            .unwrap_or(0);
        records.push(MetricRecord {
            elapsed_ms,
            latency_ms,
            status_code,
            timed_out,
            transport_error,
            response_bytes,
            in_flight_ops,
        });
    }

    Ok(records)
}

pub(super) async fn read_json_records(path: &Path) -> AppResult<Vec<MetricRecord>> {
    let bytes = tokio::fs::read(path).await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "read replay file",
            source: err,
        })
    })?;
    let payload: ExportJson = serde_json::from_slice(&bytes).map_err(|err| {
        AppError::metrics(MetricsError::External {
            context: "parse JSON",
            source: Box::new(err),
        })
    })?;
    Ok(payload
        .records
        .into_iter()
        .map(|record| MetricRecord {
            elapsed_ms: record.elapsed_ms,
            latency_ms: record.latency_ms,
            status_code: record.status_code,
            timed_out: record.timed_out,
            transport_error: record.transport_error,
            response_bytes: record.response_bytes.unwrap_or(0),
            in_flight_ops: record.in_flight_ops.unwrap_or(0),
        })
        .collect())
}

pub(super) async fn read_jsonl_records(path: &Path) -> AppResult<Vec<MetricRecord>> {
    let file = tokio::fs::File::open(path).await.map_err(|err| {
        AppError::metrics(MetricsError::Io {
            context: "open replay file",
            source: err,
        })
    })?;
    let mut reader = BufReader::new(file);
    let mut line = String::new();
    let mut records = Vec::new();

    loop {
        line.clear();
        let bytes = reader.read_line(&mut line).await.map_err(|err| {
            AppError::metrics(MetricsError::Io {
                context: "read replay file",
                source: err,
            })
        })?;
        if bytes == 0 {
            break;
        }
        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }
        let parsed: ExportJsonlLine = serde_json::from_str(trimmed).map_err(|err| {
            AppError::metrics(MetricsError::External {
                context: "parse JSONL",
                source: Box::new(err),
            })
        })?;
        if let Some(kind) = parsed.kind.as_deref()
            && kind != "record"
        {
            continue;
        }
        let Some(elapsed_ms) = parsed.elapsed_ms else {
            continue;
        };
        let Some(latency_ms) = parsed.latency_ms else {
            continue;
        };
        let Some(status_code) = parsed.status_code else {
            continue;
        };
        records.push(MetricRecord {
            elapsed_ms,
            latency_ms,
            status_code,
            timed_out: parsed.timed_out.unwrap_or(false),
            transport_error: parsed.transport_error.unwrap_or(false),
            response_bytes: parsed.response_bytes.unwrap_or(0),
            in_flight_ops: parsed.in_flight_ops.unwrap_or(0),
        });
    }

    Ok(records)
}

fn parse_bool(value: &str) -> bool {
    let trimmed = value.trim();
    trimmed == "1" || trimmed.eq_ignore_ascii_case("true")
}

#[derive(Debug, Deserialize)]
struct ExportJson {
    records: Vec<ExportRecord>,
}

#[derive(Debug, Deserialize)]
struct ExportRecord {
    elapsed_ms: u64,
    latency_ms: u64,
    status_code: u16,
    timed_out: bool,
    transport_error: bool,
    response_bytes: Option<u64>,
    in_flight_ops: Option<u64>,
}

#[derive(Debug, Deserialize)]
struct ExportJsonlLine {
    #[serde(rename = "type")]
    kind: Option<String>,
    elapsed_ms: Option<u64>,
    latency_ms: Option<u64>,
    status_code: Option<u16>,
    timed_out: Option<bool>,
    transport_error: Option<bool>,
    response_bytes: Option<u64>,
    in_flight_ops: Option<u64>,
}