strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
use std::time::Duration;

use crate::error::{AppError, AppResult};
use crate::sinks::config::{PrometheusSinkConfig, SinksConfig};

use super::{
    allocate_port, base_args, positive_u64, run_async_test, run_distributed,
    spawn_http_server_or_skip,
};

#[test]
fn tcp_streaming_controller_writes_sink() -> AppResult<()> {
    run_async_test(async {
        let Some((url, shutdown_tx)) = spawn_http_server_or_skip().await? else {
            return Ok(());
        };
        let controller_port = allocate_port()?;
        let controller_addr = format!("127.0.0.1:{}", controller_port);
        let tmp_dir = tempfile::tempdir()
            .map_err(|err| AppError::distributed(format!("Failed to create temp dir: {}", err)))?;
        let tmp_path = tmp_dir
            .path()
            .to_str()
            .ok_or_else(|| AppError::distributed("Failed to convert tmp path"))?
            .to_owned();
        let sink_path = tmp_dir.path().join("controller.prom");
        let sink_path_str = sink_path
            .to_str()
            .ok_or_else(|| AppError::distributed("Failed to convert sink path"))?
            .to_owned();

        let mut controller_args = base_args(url.clone(), tmp_path.clone())?;
        controller_args.controller_listen = Some(controller_addr.clone());
        controller_args.sinks = Some(SinksConfig {
            update_interval_ms: Some(200),
            prometheus: Some(PrometheusSinkConfig {
                path: sink_path_str.clone(),
            }),
            otel: None,
            influx: None,
        });
        controller_args.distributed_stream_summaries = true;
        controller_args.distributed_stream_interval_ms = Some(positive_u64(200)?);

        let mut agent_args = base_args(url, tmp_path)?;
        agent_args.agent_join = Some(controller_addr);

        let run_result = tokio::time::timeout(
            Duration::from_secs(12),
            run_distributed(controller_args, agent_args),
        )
        .await
        .map_err(|err| {
            AppError::distributed(format!("Timed out waiting for distributed run: {}", err))
        })?;
        run_result?;

        shutdown_tx
            .send(true)
            .map_err(|err| AppError::distributed(format!("Failed to shutdown server: {}", err)))?;

        let metadata = tokio::fs::metadata(&sink_path_str)
            .await
            .map_err(|err| AppError::distributed(format!("Missing controller sink: {}", err)))?;
        if metadata.len() == 0 {
            return Err(AppError::distributed(
                "Expected controller sink to be non-empty",
            ));
        }
        Ok(())
    })
}

#[test]
fn tcp_non_streaming_writes_agent_and_controller_sinks() -> AppResult<()> {
    run_async_test(async {
        let Some((url, shutdown_tx)) = spawn_http_server_or_skip().await? else {
            return Ok(());
        };
        let controller_port = allocate_port()?;
        let controller_addr = format!("127.0.0.1:{}", controller_port);
        let tmp_dir = tempfile::tempdir()
            .map_err(|err| AppError::distributed(format!("Failed to create temp dir: {}", err)))?;
        let tmp_path = tmp_dir
            .path()
            .to_str()
            .ok_or_else(|| AppError::distributed("Failed to convert tmp path"))?
            .to_owned();
        let controller_sink = tmp_dir.path().join("controller.prom");
        let controller_sink_str = controller_sink
            .to_str()
            .ok_or_else(|| AppError::distributed("Failed to convert controller sink path"))?
            .to_owned();
        let agent_sink = tmp_dir.path().join("agent.prom");
        let agent_sink_str = agent_sink
            .to_str()
            .ok_or_else(|| AppError::distributed("Failed to convert agent sink path"))?
            .to_owned();

        let mut controller_args = base_args(url.clone(), tmp_path.clone())?;
        controller_args.controller_listen = Some(controller_addr.clone());
        controller_args.sinks = Some(SinksConfig {
            update_interval_ms: None,
            prometheus: Some(PrometheusSinkConfig {
                path: controller_sink_str.clone(),
            }),
            otel: None,
            influx: None,
        });

        let mut agent_args = base_args(url, tmp_path)?;
        agent_args.agent_join = Some(controller_addr);
        agent_args.sinks = Some(SinksConfig {
            update_interval_ms: None,
            prometheus: Some(PrometheusSinkConfig {
                path: agent_sink_str.clone(),
            }),
            otel: None,
            influx: None,
        });

        let run_result = tokio::time::timeout(
            Duration::from_secs(12),
            run_distributed(controller_args, agent_args),
        )
        .await
        .map_err(|err| {
            AppError::distributed(format!("Timed out waiting for distributed run: {}", err))
        })?;
        run_result?;

        shutdown_tx
            .send(true)
            .map_err(|err| AppError::distributed(format!("Failed to shutdown server: {}", err)))?;

        let controller_meta = tokio::fs::metadata(&controller_sink_str)
            .await
            .map_err(|err| AppError::distributed(format!("Missing controller sink: {}", err)))?;
        if controller_meta.len() == 0 {
            return Err(AppError::distributed(
                "Expected controller sink to be non-empty",
            ));
        }
        let agent_meta = tokio::fs::metadata(&agent_sink_str)
            .await
            .map_err(|err| AppError::distributed(format!("Missing agent sink: {}", err)))?;
        if agent_meta.len() == 0 {
            return Err(AppError::distributed("Expected agent sink to be non-empty"));
        }
        Ok(())
    })
}