strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::collections::{HashMap, VecDeque};
use std::io::IsTerminal;
use std::time::Duration;

use tokio::sync::watch;

use crate::args::TesterArgs;
use crate::charts;
use crate::distributed::summary::{
    Percentiles, SummaryPercentiles, compute_summary_stats, print_summary,
};
use crate::error::AppResult;
use crate::metrics::AggregatedMetricSample;
use crate::shutdown::ShutdownSender;
use crate::sinks::config::SinkStats;
use crate::sinks::writers::write_sinks;
use crate::ui::{model::UiData, render::setup_render_ui};

use super::shared::{AgentSnapshot, aggregate_snapshots, record_aggregated_sample, update_ui};

pub(in crate::distributed::controller) enum OutputEvent {
    AgentStateUpdated,
    SinkTick,
}

pub(in crate::distributed::controller) struct DistributedOutputState {
    charts_enabled: bool,
    sink_updates_enabled: bool,
    sink_dirty: bool,
    aggregated_samples: Vec<AggregatedMetricSample>,
    ui_tx: Option<watch::Sender<UiData>>,
    shutdown_tx: Option<ShutdownSender>,
    ui_latency_window: VecDeque<(u64, u64)>,
    ui_rps_window: VecDeque<(u64, u64)>,
}

pub(in crate::distributed::controller) fn setup_output_state(
    args: &TesterArgs,
) -> DistributedOutputState {
    let streaming_enabled = args.distributed_stream_summaries;
    let ui_enabled = streaming_enabled && !args.no_ui && std::io::stdout().is_terminal();
    let (ui_tx, shutdown_tx) = if ui_enabled {
        let target_duration = Duration::from_secs(args.target_duration.get());
        let (shutdown_tx, _) = crate::system::shutdown_handlers::shutdown_channel();
        let (ui_tx, _) = watch::channel(UiData {
            target_duration,
            ui_window_ms: args.ui_window_ms.get(),
            no_color: args.no_color,
            ..UiData::default()
        });
        let _ui_handle = setup_render_ui(&shutdown_tx, &ui_tx);
        (Some(ui_tx), Some(shutdown_tx))
    } else {
        (None, None)
    };

    DistributedOutputState {
        charts_enabled: !args.no_charts && streaming_enabled,
        sink_updates_enabled: streaming_enabled && args.sinks.is_some(),
        sink_dirty: false,
        aggregated_samples: Vec::new(),
        ui_tx,
        shutdown_tx,
        ui_latency_window: VecDeque::new(),
        ui_rps_window: VecDeque::new(),
    }
}

pub(in crate::distributed::controller) async fn handle_output_event(
    args: &TesterArgs,
    state: &mut DistributedOutputState,
    agent_states: &HashMap<String, AgentSnapshot>,
    runtime_errors: &mut Vec<String>,
    event: OutputEvent,
) {
    match event {
        OutputEvent::AgentStateUpdated => {
            if state.charts_enabled {
                record_aggregated_sample(&mut state.aggregated_samples, agent_states);
            }
            if let Some(ui_tx) = state.ui_tx.as_ref() {
                update_ui(
                    ui_tx,
                    args,
                    agent_states,
                    &mut state.ui_latency_window,
                    &mut state.ui_rps_window,
                );
            }
            if state.sink_updates_enabled {
                state.sink_dirty = true;
            }
        }
        OutputEvent::SinkTick => {
            if state.sink_updates_enabled && state.sink_dirty {
                if let Err(err) = write_streaming_sinks(args, agent_states).await {
                    runtime_errors.push(err.to_string());
                } else {
                    state.sink_dirty = false;
                }
            }
        }
    }
}

pub(in crate::distributed::controller) async fn finalize_output(
    args: &TesterArgs,
    state: &mut DistributedOutputState,
    agent_states: &HashMap<String, AgentSnapshot>,
    runtime_errors: &mut Vec<String>,
) {
    if agent_states.is_empty() {
        runtime_errors.push("No successful agent reports received.".to_owned());
        send_shutdown_signal(state);
        return;
    }

    let Ok((summary, merged_hist, success_hist)) = aggregate_snapshots(agent_states) else {
        runtime_errors.push("Failed to aggregate agent summaries.".to_owned());
        send_shutdown_signal(state);
        return;
    };

    let (p50, p90, p99) = merged_hist.percentiles();
    let (success_p50, success_p90, success_p99) = success_hist.percentiles();
    let stats = compute_summary_stats(&summary);
    let mut charts_output_path: Option<String> = None;
    if state.charts_enabled {
        match write_aggregated_charts(&state.aggregated_samples, args).await {
            Ok(path) => charts_output_path = path,
            Err(err) => runtime_errors.push(err.to_string()),
        }
    }

    let percentiles = SummaryPercentiles {
        all: Percentiles { p50, p90, p99 },
        ok: Percentiles {
            p50: success_p50,
            p90: success_p90,
            p99: success_p99,
        },
    };

    print_summary(&summary, percentiles, args, charts_output_path.as_deref());

    if let Some(sinks) = args.sinks.as_ref() {
        let sink_stats = SinkStats {
            duration: summary.duration,
            total_requests: summary.total_requests,
            successful_requests: summary.successful_requests,
            error_requests: summary.error_requests,
            timeout_requests: summary.timeout_requests,
            min_latency_ms: summary.min_latency_ms,
            max_latency_ms: summary.max_latency_ms,
            avg_latency_ms: summary.avg_latency_ms,
            p50_latency_ms: p50,
            p90_latency_ms: p90,
            p99_latency_ms: p99,
            success_rate_x100: stats.success_rate_x100,
            avg_rps_x100: stats.avg_rps_x100,
            avg_rpm_x100: stats.avg_rpm_x100,
        };
        if let Err(err) = write_sinks(sinks, &sink_stats).await {
            runtime_errors.push(format!("Sinks: {}", err));
        }
    }

    send_shutdown_signal(state);
}

fn send_shutdown_signal(state: &DistributedOutputState) {
    if let Some(shutdown_tx) = state.shutdown_tx.as_ref() {
        drop(shutdown_tx.send(()));
    }
}

async fn write_streaming_sinks(
    args: &TesterArgs,
    agent_states: &HashMap<String, AgentSnapshot>,
) -> AppResult<()> {
    if agent_states.is_empty() {
        return Ok(());
    }
    let (summary, merged_hist, _success_hist) = aggregate_snapshots(agent_states)?;
    let (p50, p90, p99) = merged_hist.percentiles();
    let stats = compute_summary_stats(&summary);

    if let Some(sinks) = args.sinks.as_ref() {
        let sink_stats = SinkStats {
            duration: summary.duration,
            total_requests: summary.total_requests,
            successful_requests: summary.successful_requests,
            error_requests: summary.error_requests,
            timeout_requests: summary.timeout_requests,
            min_latency_ms: summary.min_latency_ms,
            max_latency_ms: summary.max_latency_ms,
            avg_latency_ms: summary.avg_latency_ms,
            p50_latency_ms: p50,
            p90_latency_ms: p90,
            p99_latency_ms: p99,
            success_rate_x100: stats.success_rate_x100,
            avg_rps_x100: stats.avg_rps_x100,
            avg_rpm_x100: stats.avg_rpm_x100,
        };
        write_sinks(sinks, &sink_stats).await?;
    }
    Ok(())
}

async fn write_aggregated_charts(
    samples: &[AggregatedMetricSample],
    args: &TesterArgs,
) -> AppResult<Option<String>> {
    if args.no_charts || samples.len() < 2 {
        return Ok(None);
    }
    charts::plot_aggregated_metrics(samples, args).await
}