strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::time::Duration;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::watch;

use super::protocol::WireArgs;
use super::wire::{apply_wire_args, build_wire_args};
use super::{AgentLocalRunPort, AgentRunOutcome, run_agent, run_controller};
use crate::args::{HttpMethod, LoadMode, PositiveU64, PositiveUsize, Protocol, TesterArgs};
use crate::error::{AppError, AppResult};
use crate::metrics::StreamSnapshot;

mod sink_runs;
mod stability;
mod wire_args;

fn positive_u64(value: u64) -> AppResult<PositiveU64> {
    Ok(PositiveU64::try_from(value)?)
}

fn positive_usize(value: usize) -> AppResult<PositiveUsize> {
    Ok(PositiveUsize::try_from(value)?)
}

fn base_args(url: String, tmp_path: String) -> AppResult<TesterArgs> {
    Ok(TesterArgs {
        command: None,
        replay: false,
        replay_start: None,
        replay_end: None,
        replay_step: None,
        replay_snapshot_interval: None,
        replay_snapshot_start: None,
        replay_snapshot_end: None,
        replay_snapshot_out: None,
        replay_snapshot_format: "json".to_owned(),
        method: HttpMethod::Get,
        protocol: Protocol::Http,
        load_mode: LoadMode::Arrival,
        url: Some(url),
        urls_from_file: false,
        rand_regex_url: false,
        max_repeat: positive_usize(4)?,
        dump_urls: None,
        headers: vec![],
        accept_header: None,
        content_type: None,
        no_ua: false,
        authorized: false,
        data: String::new(),
        form: vec![],
        basic_auth: None,
        aws_session: None,
        aws_sigv4: None,
        data_file: None,
        data_lines: None,
        target_duration: positive_u64(1)?,
        wait_ongoing_requests_after_deadline: false,
        requests: None,
        expected_status_code: 200,
        request_timeout: Duration::from_secs(2),
        redirect_limit: 10,
        disable_keepalive: false,
        disable_compression: false,
        pool_max_idle_per_host: None,
        pool_idle_timeout_ms: None,
        http_version: None,
        connect_timeout: Duration::from_secs(5),
        charts_path: "./charts".to_owned(),
        no_charts: true,
        charts_latency_bucket_ms: positive_u64(100)?,
        verbose: false,
        config: None,
        tmp_path,
        load_profile: None,
        controller_listen: None,
        controller_mode: crate::args::ControllerMode::Auto,
        control_listen: None,
        control_auth_token: None,
        agent_join: None,
        auth_token: None,
        agent_id: None,
        agent_weight: positive_u64(1)?,
        min_agents: positive_usize(1)?,
        agent_wait_timeout_ms: None,
        agent_standby: false,
        agent_reconnect_ms: positive_u64(1000)?,
        agent_heartbeat_interval_ms: positive_u64(1000)?,
        agent_heartbeat_timeout_ms: positive_u64(3000)?,
        keep_tmp: false,
        warmup: None,
        output: None,
        output_format: None,
        time_unit: None,
        export_csv: None,
        export_json: None,
        export_jsonl: None,
        db_url: None,
        log_shards: positive_usize(1)?,
        no_ui: true,
        no_splash: true,
        ui_window_ms: positive_u64(10_000)?,
        summary: false,
        show_selections: false,
        tls_min: None,
        tls_max: None,
        cacert: None,
        cert: None,
        key: None,
        insecure: false,
        http2: false,
        http2_parallel: positive_usize(1)?,
        http3: false,
        alpn: vec![],
        proxy_url: None,
        proxy_headers: vec![],
        proxy_http_version: None,
        proxy_http2: false,
        max_tasks: positive_usize(1)?,
        spawn_rate_per_tick: positive_usize(1)?,
        tick_interval: positive_u64(100)?,
        rate_limit: None,
        burst_delay: None,
        burst_rate: positive_usize(1)?,
        latency_correction: false,
        connect_to: vec![],
        host_header: None,
        ipv6_only: false,
        ipv4_only: false,
        no_pre_lookup: false,
        no_color: false,
        ui_fps: 16,
        stats_success_breakdown: false,
        unix_socket: None,
        metrics_range: None,
        metrics_max: positive_usize(1_000)?,
        rss_log_ms: None,
        alloc_profiler_ms: None,
        alloc_profiler_dump_ms: None,
        alloc_profiler_dump_path: "./alloc-prof".to_owned(),
        scenario: None,
        script: None,
        plugin: vec![],
        install_service: false,
        uninstall_service: false,
        service_name: None,
        sinks: None,
        distributed_silent: false,
        distributed_stream_summaries: false,
        distributed_stream_interval_ms: None,
    })
}

fn run_async_test<F>(future: F) -> AppResult<()>
where
    F: std::future::Future<Output = AppResult<()>>,
{
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .map_err(|err| AppError::distributed(format!("Failed to build runtime: {}", err)))?;
    runtime.block_on(future)
}

fn allocate_port() -> AppResult<u16> {
    let listener = std::net::TcpListener::bind("127.0.0.1:0")
        .map_err(|err| AppError::distributed(format!("Failed to bind port: {}", err)))?;
    let port = listener
        .local_addr()
        .map_err(|err| AppError::distributed(format!("Failed to read local addr: {}", err)))?
        .port();
    Ok(port)
}

async fn spawn_http_server() -> AppResult<(String, watch::Sender<bool>)> {
    let listener = TcpListener::bind("127.0.0.1:0")
        .await
        .map_err(|err| AppError::distributed(format!("Failed to bind test server: {}", err)))?;
    let addr = listener
        .local_addr()
        .map_err(|err| AppError::distributed(format!("Failed to read server addr: {}", err)))?;
    let (shutdown_tx, mut shutdown_rx) = watch::channel(false);

    tokio::spawn(async move {
        loop {
            tokio::select! {
                changed = shutdown_rx.changed() => {
                    if changed.is_err() {
                        break;
                    }
                    if *shutdown_rx.borrow() {
                        break;
                    }
                }
                accept = listener.accept() => {
                    let (socket, _) = match accept {
                        Ok(result) => result,
                        Err(_) => break,
                    };
                    tokio::spawn(handle_http(socket));
                }
            }
        }
    });

    Ok((format!("http://{}", addr), shutdown_tx))
}

async fn spawn_http_server_or_skip() -> AppResult<Option<(String, watch::Sender<bool>)>> {
    match spawn_http_server().await {
        Ok(result) => Ok(Some(result)),
        Err(err) if err.to_string().contains("Operation not permitted") => {
            eprintln!("Skipping distributed test: {}", err);
            Ok(None)
        }
        Err(err) => Err(err),
    }
}

async fn handle_http(mut socket: TcpStream) {
    let mut buffer = [0u8; 1024];
    if socket.read(&mut buffer).await.is_err() {
        return;
    }
    if socket
        .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\nOK")
        .await
        .is_err()
    {
        return;
    }
    let _shutdown_result = socket.shutdown().await;
}

async fn run_distributed(controller_args: TesterArgs, agent_args: TesterArgs) -> AppResult<()> {
    let controller_handle =
        tokio::spawn(async move { run_controller(&controller_args, None).await });
    tokio::time::sleep(Duration::from_millis(200)).await;
    let local_port = TestAgentLocalRunPort;
    let agent_result = run_agent(agent_args, &local_port).await;
    let controller_result = controller_handle
        .await
        .map_err(|err| AppError::distributed(format!("Controller task join failed: {}", err)))?;
    agent_result?;
    controller_result?;
    Ok(())
}

struct TestAgentLocalRunPort;

#[async_trait::async_trait]
impl AgentLocalRunPort for TestAgentLocalRunPort {
    async fn run_local(
        &self,
        args: TesterArgs,
        stream_tx: Option<tokio::sync::mpsc::UnboundedSender<StreamSnapshot>>,
        external_shutdown: Option<watch::Receiver<bool>>,
    ) -> AppResult<AgentRunOutcome> {
        let outcome = crate::app::run_local(args, stream_tx, external_shutdown).await?;
        Ok(AgentRunOutcome {
            summary: outcome.summary,
            histogram: outcome.histogram,
            success_histogram: outcome.success_histogram,
            latency_sum_ms: outcome.latency_sum_ms,
            success_latency_sum_ms: outcome.success_latency_sum_ms,
            runtime_errors: outcome.runtime_errors,
        })
    }
}