strest 0.1.10

Blazing-fast async HTTP load tester in Rust - lock-free design, real-time stats, distributed runs, and optional chart exports for high-load API testing.
Documentation
mod db;

use std::fmt::Write as _;
use std::path::PathBuf;
use std::time::Duration;

use tokio::{
    fs::File,
    io::{AsyncWriteExt, BufWriter},
    sync::mpsc,
    task::JoinHandle,
};
use tokio_rusqlite::Connection;

use crate::error::{AppError, AppResult, MetricsError};

use super::super::{LatencyHistogram, MetricRecord, Metrics, MetricsRange, MetricsSummary};
use super::{LogResult, MetricsLoggerConfig};
use db::{DB_FLUSH_SIZE, DbRecord, flush_db_records};

#[must_use]
pub fn setup_metrics_logger(
    log_path: PathBuf,
    config: MetricsLoggerConfig,
    mut log_rx: mpsc::Receiver<Metrics>,
) -> JoinHandle<AppResult<LogResult>> {
    tokio::spawn(async move {
        let warmup_ms = config
            .warmup
            .map(|duration| u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
            .unwrap_or(0);
        let file = File::create(&log_path).await.map_err(|err| {
            AppError::metrics(MetricsError::Io {
                context: "create metrics log",
                source: err,
            })
        })?;
        const LOG_BUFFER_SIZE: usize = 256 * 1024;
        let mut writer = BufWriter::with_capacity(LOG_BUFFER_SIZE, file);
        let mut buffer = String::with_capacity(LOG_BUFFER_SIZE);
        let mut records = Vec::new();
        let mut metrics_truncated = false;
        let collect_records = config.metrics_max > 0;
        let mut histogram = LatencyHistogram::new()?;
        let mut success_histogram = LatencyHistogram::new()?;
        let db_conn = if let Some(db_url) = config.db_url.as_deref() {
            let conn = Connection::open(db_url).await.map_err(|err| {
                AppError::metrics(MetricsError::External {
                    context: "open sqlite db",
                    source: Box::new(err),
                })
            })?;
            conn.call(|conn| {
                conn.execute_batch(
                    "CREATE TABLE IF NOT EXISTS metrics (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        elapsed_ms INTEGER NOT NULL,
                        latency_ms INTEGER NOT NULL,
                        status_code INTEGER NOT NULL,
                        timed_out INTEGER NOT NULL,
                        transport_error INTEGER NOT NULL
                    );
                    CREATE INDEX IF NOT EXISTS idx_metrics_elapsed_ms ON metrics(elapsed_ms);",
                )?;
                Ok(())
            })
            .await
            .map_err(|err| {
                AppError::metrics(MetricsError::External {
                    context: "initialize sqlite db",
                    source: Box::new(err),
                })
            })?;
            Some(conn)
        } else {
            None
        };
        let mut db_buffer: Vec<DbRecord> = Vec::new();

        let mut total_requests: u64 = 0;
        let mut successful_requests: u64 = 0;
        let mut timeout_requests: u64 = 0;
        let mut latency_sum_ms: u128 = 0;
        let mut success_latency_sum_ms: u128 = 0;
        let mut min_latency_ms: u64 = u64::MAX;
        let mut max_latency_ms: u64 = 0;
        let mut success_min_latency_ms: u64 = u64::MAX;
        let mut success_max_latency_ms: u64 = 0;
        let mut transport_errors: u64 = 0;
        let mut non_expected_status: u64 = 0;
        let mut max_elapsed_ms: u64 = 0;

        while let Some(msg) = log_rx.recv().await {
            let elapsed_ms_raw = u64::try_from(
                msg.start
                    .saturating_duration_since(config.run_start)
                    .as_millis(),
            )
            .unwrap_or(u64::MAX);
            if elapsed_ms_raw < warmup_ms {
                continue;
            }
            let elapsed_ms = elapsed_ms_raw.saturating_sub(warmup_ms);
            let latency_ms = u64::try_from(msg.response_time.as_millis()).unwrap_or(u64::MAX);

            writeln!(
                &mut buffer,
                "{},{},{},{},{},{},{}",
                elapsed_ms,
                latency_ms,
                msg.status_code,
                u8::from(msg.timed_out),
                u8::from(msg.transport_error),
                msg.response_bytes,
                msg.in_flight_ops
            )
            .map_err(|err| {
                AppError::metrics(MetricsError::External {
                    context: "format metrics log line",
                    source: Box::new(err),
                })
            })?;

            if buffer.len() >= LOG_BUFFER_SIZE {
                writer.write_all(buffer.as_bytes()).await.map_err(|err| {
                    AppError::metrics(MetricsError::Io {
                        context: "write metrics log",
                        source: err,
                    })
                })?;
                buffer.clear();
            }

            total_requests = total_requests.saturating_add(1);
            if msg.status_code == config.expected_status_code
                && !msg.timed_out
                && !msg.transport_error
            {
                successful_requests = successful_requests.saturating_add(1);
                success_latency_sum_ms =
                    success_latency_sum_ms.saturating_add(u128::from(latency_ms));
                if latency_ms < success_min_latency_ms {
                    success_min_latency_ms = latency_ms;
                }
                if latency_ms > success_max_latency_ms {
                    success_max_latency_ms = latency_ms;
                }
                success_histogram.record(latency_ms)?;
            }
            if msg.timed_out {
                timeout_requests = timeout_requests.saturating_add(1);
            } else if msg.transport_error {
                transport_errors = transport_errors.saturating_add(1);
            } else if msg.status_code != config.expected_status_code {
                non_expected_status = non_expected_status.saturating_add(1);
            }
            latency_sum_ms = latency_sum_ms.saturating_add(u128::from(latency_ms));
            if latency_ms < min_latency_ms {
                min_latency_ms = latency_ms;
            }
            if latency_ms > max_latency_ms {
                max_latency_ms = latency_ms;
            }
            if elapsed_ms > max_elapsed_ms {
                max_elapsed_ms = elapsed_ms;
            }
            histogram.record(latency_ms)?;

            if let Some(conn) = db_conn.as_ref() {
                db_buffer.push(DbRecord {
                    elapsed_ms,
                    latency_ms,
                    status_code: msg.status_code,
                    timed_out: msg.timed_out,
                    transport_error: msg.transport_error,
                });
                if db_buffer.len() >= DB_FLUSH_SIZE {
                    flush_db_records(conn, &mut db_buffer).await?;
                }
            }

            if collect_records {
                let seconds_elapsed = elapsed_ms / 1000;
                let in_range = match &config.metrics_range {
                    Some(MetricsRange(range)) => range.contains(&seconds_elapsed),
                    None => true,
                };
                if in_range {
                    if records.len() < config.metrics_max {
                        records.push(MetricRecord {
                            elapsed_ms,
                            latency_ms,
                            status_code: msg.status_code,
                            timed_out: msg.timed_out,
                            transport_error: msg.transport_error,
                            response_bytes: msg.response_bytes,
                            in_flight_ops: msg.in_flight_ops,
                        });
                    } else {
                        metrics_truncated = true;
                    }
                }
            }
        }

        if !buffer.is_empty() {
            writer.write_all(buffer.as_bytes()).await.map_err(|err| {
                AppError::metrics(MetricsError::Io {
                    context: "write metrics log",
                    source: err,
                })
            })?;
        }
        writer.flush().await.map_err(|err| {
            AppError::metrics(MetricsError::Io {
                context: "flush metrics log",
                source: err,
            })
        })?;
        if let Some(conn) = db_conn.as_ref() {
            flush_db_records(conn, &mut db_buffer).await?;
        }
        let duration = Duration::from_millis(max_elapsed_ms);
        let avg_latency_ms = if total_requests > 0 {
            let avg = latency_sum_ms
                .checked_div(u128::from(total_requests))
                .unwrap_or(0);
            u64::try_from(avg).map_or(u64::MAX, |value| value)
        } else {
            0
        };
        let min_latency_ms = if total_requests > 0 {
            min_latency_ms
        } else {
            0
        };
        let success_avg_latency_ms = if successful_requests > 0 {
            let avg = success_latency_sum_ms
                .checked_div(u128::from(successful_requests))
                .unwrap_or(0);
            u64::try_from(avg).map_or(u64::MAX, |value| value)
        } else {
            0
        };
        let success_min_latency_ms = if successful_requests > 0 {
            success_min_latency_ms
        } else {
            0
        };
        let success_max_latency_ms = if successful_requests > 0 {
            success_max_latency_ms
        } else {
            0
        };
        let error_requests = total_requests.saturating_sub(successful_requests);

        Ok(LogResult {
            records,
            summary: MetricsSummary {
                duration,
                total_requests,
                successful_requests,
                error_requests,
                timeout_requests,
                transport_errors,
                non_expected_status,
                min_latency_ms,
                max_latency_ms,
                avg_latency_ms,
                success_min_latency_ms,
                success_max_latency_ms,
                success_avg_latency_ms,
            },
            metrics_truncated,
            latency_sum_ms,
            success_latency_sum_ms,
            histogram,
            success_histogram,
        })
    })
}