strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::path::{Path, PathBuf};

use tracing::info;

use crate::{
    app::{cleanup, export, logs, summary},
    args::{OutputFormat, TesterArgs},
    charts,
    error::AppResult,
    metrics,
    sinks::{config::SinkStats, writers},
};

use super::RunOutcome;
#[cfg(feature = "wasm")]
use crate::wasm_plugins::WasmPluginHost;

pub(super) struct FinalizeContext<'args> {
    pub(super) args: &'args TesterArgs,
    pub(super) charts_enabled: bool,
    pub(super) summary_enabled: bool,
    pub(super) metrics_max: usize,
    pub(super) runtime_errors: Vec<String>,
    pub(super) report: metrics::MetricsReport,
    pub(super) log_handles: Vec<tokio::task::JoinHandle<AppResult<metrics::LogResult>>>,
    pub(super) log_paths: Vec<PathBuf>,
    #[cfg(feature = "wasm")]
    pub(super) plugin_host: Option<&'args mut WasmPluginHost>,
}

pub(super) async fn finalize_run(ctx: FinalizeContext<'_>) -> AppResult<RunOutcome> {
    let FinalizeContext {
        args,
        charts_enabled,
        summary_enabled,
        metrics_max,
        mut runtime_errors,
        report,
        log_handles,
        log_paths,
        #[cfg(feature = "wasm")]
        plugin_host,
    } = ctx;
    #[cfg(feature = "wasm")]
    let mut plugin_host = plugin_host;
    let mut log_results = Vec::new();
    for handle in log_handles {
        match handle.await {
            Ok(Ok(result)) => log_results.push(result),
            Ok(Err(err)) => {
                runtime_errors.push(format!("Metrics log task failed: {}", err));
            }
            Err(err) => {
                runtime_errors.push(format!("Log task join failed: {}", err));
            }
        }
    }

    let (
        summary,
        _chart_records_unused,
        _metrics_truncated_unused,
        histogram,
        latency_sum_ms,
        success_latency_sum_ms,
        success_histogram,
    ) = if !log_results.is_empty() {
        logs::merge_log_results(log_results, metrics_max)?
    } else {
        (
            report.summary,
            Vec::new(),
            false,
            metrics::LatencyHistogram::new()?,
            0,
            0,
            metrics::LatencyHistogram::new()?,
        )
    };
    let latency_sum_ms = if latency_sum_ms == 0 && summary.total_requests > 0 {
        u128::from(summary.avg_latency_ms).saturating_mul(u128::from(summary.total_requests))
    } else {
        latency_sum_ms
    };
    let success_latency_sum_ms = if success_latency_sum_ms == 0 && summary.successful_requests > 0 {
        u128::from(summary.success_avg_latency_ms)
            .saturating_mul(u128::from(summary.successful_requests))
    } else {
        success_latency_sum_ms
    };
    let need_chart_records =
        args.export_csv.is_some() || args.export_json.is_some() || args.export_jsonl.is_some();
    let (chart_records, metrics_truncated) = if need_chart_records && !log_paths.is_empty() {
        match logs::load_log_records(&log_paths, &args.metrics_range, metrics_max).await {
            Ok((records, truncated)) => (records, truncated),
            Err(err) => {
                runtime_errors.push(format!("Failed to load metrics logs: {}", err));
                (Vec::new(), false)
            }
        }
    } else {
        (Vec::new(), false)
    };

    let (mut p50, mut p90, mut p99) = histogram.percentiles();
    let (mut success_p50, mut success_p90, mut success_p99) = success_histogram.percentiles();
    if histogram.count() == 0 && !chart_records.is_empty() {
        let (fallback_p50, fallback_p90, fallback_p99) =
            summary::compute_percentiles(&chart_records);
        p50 = fallback_p50;
        p90 = fallback_p90;
        p99 = fallback_p99;
    }
    if success_histogram.count() == 0 && summary.successful_requests > 0 {
        let expected_status = args.expected_status_code;
        let success_records: Vec<metrics::MetricRecord> = chart_records
            .iter()
            .copied()
            .filter(|record| record.status_code == expected_status)
            .collect();
        let (fallback_p50, fallback_p90, fallback_p99) =
            summary::compute_percentiles(&success_records);
        success_p50 = fallback_p50;
        success_p90 = fallback_p90;
        success_p99 = fallback_p99;
    }

    let mut charts_output_path: Option<String> = None;
    if charts_enabled && !log_paths.is_empty() {
        info!("Plotting charts...");

        match logs::load_chart_data_streaming(
            &log_paths,
            args.expected_status_code,
            &args.metrics_range,
            args.charts_latency_bucket_ms.get(),
        )
        .await
        {
            Ok(chart_data) => {
                if let Some(path) = charts::plot_streaming_metrics(&chart_data, args).await? {
                    info!("Charts saved in {}", path);
                    #[cfg(feature = "wasm")]
                    if let Some(host) = plugin_host.as_mut()
                        && let Err(err) = host.on_artifact("charts", &path)
                    {
                        runtime_errors.push(format!("WASM plugin chart hook failed: {}", err));
                    }
                    charts_output_path = Some(path);
                }
            }
            Err(err) => {
                runtime_errors.push(format!("Failed to build charts: {}", err));
            }
        }
    }

    let summary_stats = summary::compute_summary_stats(&summary);

    if summary_enabled
        && !args.distributed_silent
        && args.output_format != Some(OutputFormat::Quiet)
    {
        let extras = summary::SummaryExtras {
            metrics_truncated,
            charts_output_path: charts_output_path.clone(),
            p50,
            p90,
            p99,
            success_p50,
            success_p90,
            success_p99,
        };
        summary::print_summary(&summary, &extras, &summary_stats, args);
    }

    if let Some(path) = args.output.as_deref()
        && matches!(
            args.output_format,
            Some(OutputFormat::Text | OutputFormat::Quiet)
        )
        && let Err(err) = export_text_summary(
            path,
            &summary,
            &summary_stats,
            args,
            &summary::SummaryExtras {
                metrics_truncated,
                charts_output_path: charts_output_path.clone(),
                p50,
                p90,
                p99,
                success_p50,
                success_p90,
                success_p99,
            },
        )
        .await
    {
        runtime_errors.push(format!("Failed to write output: {}", err));
    }

    if let Some(path) = args.export_csv.as_deref()
        && let Err(err) = export::export_csv(path, &chart_records).await
    {
        runtime_errors.push(format!("Failed to export CSV: {}", err));
    } else {
        #[cfg(feature = "wasm")]
        if let Some(path) = args.export_csv.as_deref()
            && let Some(host) = plugin_host.as_mut()
            && let Err(err) = host.on_artifact("export_csv", path)
        {
            runtime_errors.push(format!("WASM plugin CSV hook failed: {}", err));
        }
    }

    if let Some(path) = args.export_json.as_deref()
        && let Err(err) = export::export_json(path, &summary, &chart_records).await
    {
        runtime_errors.push(format!("Failed to export JSON: {}", err));
    } else {
        #[cfg(feature = "wasm")]
        if let Some(path) = args.export_json.as_deref()
            && let Some(host) = plugin_host.as_mut()
            && let Err(err) = host.on_artifact("export_json", path)
        {
            runtime_errors.push(format!("WASM plugin JSON hook failed: {}", err));
        }
    }

    if let Some(path) = args.export_jsonl.as_deref()
        && let Err(err) = export::export_jsonl(path, &summary, &chart_records).await
    {
        runtime_errors.push(format!("Failed to export JSONL: {}", err));
    } else {
        #[cfg(feature = "wasm")]
        if let Some(path) = args.export_jsonl.as_deref()
            && let Some(host) = plugin_host.as_mut()
            && let Err(err) = host.on_artifact("export_jsonl", path)
        {
            runtime_errors.push(format!("WASM plugin JSONL hook failed: {}", err));
        }
    }

    if let Some(sinks_config) = args.sinks.as_ref() {
        let sink_stats = SinkStats {
            duration: summary.duration,
            total_requests: summary.total_requests,
            successful_requests: summary.successful_requests,
            error_requests: summary.error_requests,
            timeout_requests: summary.timeout_requests,
            min_latency_ms: summary.min_latency_ms,
            max_latency_ms: summary.max_latency_ms,
            avg_latency_ms: summary.avg_latency_ms,
            p50_latency_ms: p50,
            p90_latency_ms: p90,
            p99_latency_ms: p99,
            success_rate_x100: summary_stats.success_rate_x100,
            avg_rps_x100: summary_stats.avg_rps_x100,
            avg_rpm_x100: summary_stats.avg_rpm_x100,
        };
        if let Err(err) = writers::write_sinks(sinks_config, &sink_stats).await {
            runtime_errors.push(format!("Failed to write sinks: {}", err));
        }
    }

    #[cfg(feature = "wasm")]
    if let Some(host) = plugin_host.as_mut() {
        if let Err(err) = host.on_metrics_summary(&summary) {
            runtime_errors.push(format!("WASM plugin summary hook failed: {}", err));
        }
        if let Err(err) = host.on_run_end(&runtime_errors) {
            runtime_errors.push(format!("WASM plugin run_end hook failed: {}", err));
        }
    }

    if !args.keep_tmp && !log_paths.is_empty() {
        for log_path in &log_paths {
            if let Err(err) = cleanup::cleanup_tmp(log_path, Path::new(&args.tmp_path)).await {
                runtime_errors.push(format!("Failed to cleanup tmp data: {}", err));
            }
        }
    }

    Ok(RunOutcome {
        summary,
        histogram,
        success_histogram,
        latency_sum_ms,
        success_latency_sum_ms,
        runtime_errors,
    })
}

async fn export_text_summary(
    path: &str,
    summary: &metrics::MetricsSummary,
    stats: &summary::SummaryStats,
    args: &TesterArgs,
    extras: &summary::SummaryExtras,
) -> Result<(), std::io::Error> {
    if matches!(args.output_format, Some(OutputFormat::Quiet)) {
        tokio::fs::write(path, "").await?;
        return Ok(());
    }
    let lines = summary::summary_lines(summary, extras, stats, args);
    let content = lines.join("\n");
    tokio::fs::write(path, content).await?;
    Ok(())
}