#![allow(clippy::uninlined_format_args)]
use anyhow::Result;
use clap::Parser;
use pulzr::{
auth::{ApiKeyConfig, ApiKeyManager, AuthMethod, JwtConfig, JwtManager},
cli::{Cli, OutputFormat, OutputFormatExtended},
client::HttpClient,
debug::DebugConfig,
distributed::{
CoordinatorConfig, DistributedClient, DistributedCoordinator, DistributedWorker,
WorkerConfig,
},
endpoints::MultiEndpointConfig,
export::CsvExporter,
get_process_memory_usage, get_system_memory_info,
grafana::GrafanaManager,
load_tester::LoadTester,
prometheus::PrometheusExporter,
prometheus_server::PrometheusServer,
ramp_up::RampUpConfig,
rate_limiter::RequestRateLimiter,
scenario::Scenario,
stats::{AlertConfig, FinalSummary, StatsCollector},
tui::TuiApp,
user_agent::UserAgentManager,
websocket::{TestConfig, WebSocketMessage, WebSocketServer},
webui::start_web_server,
Http2Config, TlsConfig,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::signal;
use tokio::sync::broadcast;
fn open_browser(url: &str) -> Result<()> {
#[cfg(target_os = "windows")]
{
std::process::Command::new("cmd")
.args(["/c", "start", url])
.spawn()?;
}
#[cfg(target_os = "macos")]
{
std::process::Command::new("open").arg(url).spawn()?;
}
#[cfg(target_os = "linux")]
{
std::process::Command::new("xdg-open").arg(url).spawn()?;
}
Ok(())
}
fn setup_authentication(cli: &Cli) -> Result<AuthMethod> {
if cli.jwt_token.is_some() || cli.jwt_secret.is_some() {
let jwt_config = JwtConfig {
token: cli.jwt_token.clone(),
secret: cli.jwt_secret.clone(),
algorithm: jsonwebtoken::Algorithm::HS256,
issuer: cli.jwt_issuer.clone(),
audience: cli.jwt_audience.clone(),
refresh_threshold_minutes: 5,
auto_refresh: cli.jwt_auto_refresh,
refresh_endpoint: cli.jwt_refresh_endpoint.clone(),
};
let jwt_manager = JwtManager::new(jwt_config);
return Ok(AuthMethod::Jwt(jwt_manager));
}
if let Some(api_key) = &cli.api_key {
let api_key_config = ApiKeyConfig {
api_key: api_key.clone(),
header_name: cli.api_key_header.clone(),
location: cli.api_key_location.to_auth_location(),
};
let api_key_manager = ApiKeyManager::new(api_key_config);
return Ok(AuthMethod::ApiKey(api_key_manager));
}
Ok(AuthMethod::None)
}
fn should_print_intro(cli: &Cli) -> bool {
if cli.is_quiet() {
return false;
}
if let Some(print) = &cli.print {
let parts: Vec<&str> = print.split(',').map(|s| s.trim()).collect();
parts
.iter()
.any(|&p| matches!(p, "intro" | "i" | "i,p,r" | "intro,progress,result"))
} else {
true }
}
fn should_print_result(cli: &Cli) -> bool {
if cli.is_quiet() {
return false;
}
if let Some(print) = &cli.print {
let parts: Vec<&str> = print.split(',').map(|s| s.trim()).collect();
parts
.iter()
.any(|&p| matches!(p, "result" | "r" | "i,p,r" | "intro,progress,result"))
} else {
true }
}
fn create_http2_config(cli: &Cli) -> Result<Http2Config> {
if cli.http2 && cli.http1_only {
return Err(anyhow::anyhow!(
"Cannot specify both --http2 and --http1-only"
));
}
let mut config = if cli.http1_only {
Http2Config::http1_only()
} else if cli.http2 {
if cli.http2_prior_knowledge {
Http2Config::with_prior_knowledge()
} else {
Http2Config::enabled()
}
} else {
Http2Config::default()
};
if let (Some(conn_window), Some(stream_window)) = (
cli.http2_initial_connection_window_size,
cli.http2_initial_stream_window_size,
) {
config.initial_connection_window_size = Some(conn_window);
config.initial_stream_window_size = Some(stream_window);
} else {
if let Some(conn_window) = cli.http2_initial_connection_window_size {
config.initial_connection_window_size = Some(conn_window);
}
if let Some(stream_window) = cli.http2_initial_stream_window_size {
config.initial_stream_window_size = Some(stream_window);
}
}
if let Some(frame_size) = cli.http2_max_frame_size {
config.max_frame_size = Some(frame_size);
}
config.validate()?;
Ok(config)
}
fn create_tls_config(cli: &Cli) -> Result<TlsConfig> {
let mut config = TlsConfig::new();
if cli.insecure {
config.insecure = true;
}
if let (Some(cert), Some(key)) = (&cli.cert, &cli.key) {
config.cert_path = Some(cert.clone());
config.key_path = Some(key.clone());
}
config.validate()?;
Ok(config)
}
fn print_examples() {
println!("📚 Pulzr Load Testing Examples\n");
println!("🚀 Basic Usage:");
println!(" # Simple load test");
println!(" pulzr https://httpbin.org/get -c 10 -d 30");
println!(" ");
println!(" # Request count mode");
println!(" pulzr https://httpbin.org/get -c 10 -n 500");
println!();
println!("🌐 WebUI Testing:");
println!(" # WebUI with auto-open browser");
println!(" pulzr https://httpbin.org/get --webui --open-browser -c 20 -d 60");
println!(" ");
println!(" # WebUI with request count");
println!(" pulzr https://httpbin.org/get --webui -c 15 -n 1000");
println!();
println!("📊 Output Formats:");
println!(" # JSON output for automation");
println!(" pulzr https://api.example.com --format json -c 10 -n 100");
println!(" ");
println!(" # Compact output with latencies");
println!(" pulzr https://api.example.com --format plain-text --latencies -c 25 -d 30");
println!(" ");
println!(" # Silent mode");
println!(" pulzr https://api.example.com --no-print -c 10 -n 100");
println!();
println!("⚡ Performance Testing:");
println!(" # High-throughput test");
println!(" pulzr https://api.example.com -c 100 -r 500 -d 300 --headless");
println!(" ");
println!(" # Request count with rate limiting");
println!(" pulzr https://api.example.com -c 50 -n 10000 -r 200 --output perf_test");
println!();
println!("🔧 HTTP Methods & Headers:");
println!(" # POST with JSON payload");
println!(" pulzr https://httpbin.org/post --method POST \\");
println!(" --body '{{\"name\": \"test\", \"value\": 123}}' \\");
println!(" --headers 'Content-Type: application/json' -c 5 -n 10");
println!(" ");
println!(" # Custom headers and User-Agent");
println!(" pulzr https://api.example.com \\");
println!(" --headers 'Authorization: Bearer token123' \\");
println!(" --user-agent 'MyApp/1.0' -c 10 -d 30");
println!();
println!("🎯 Advanced Features:");
println!(" # Random User-Agent rotation");
println!(" pulzr https://httpbin.org/user-agent --random-ua -c 5 -d 60");
println!(" ");
println!(" # CSV export with detailed logging");
println!(" pulzr https://api.example.com -c 20 -d 120 --output test_results");
println!(" ");
println!(" # HTTP/2 with multiplexing");
println!(" pulzr https://http2.github.io --http2 -c 50 -d 30");
println!();
println!("🔒 TLS & Security:");
println!(" # Skip certificate verification (insecure)");
println!(" pulzr https://self-signed.badssl.com --insecure -c 5 -n 10");
println!(" ");
println!(" # Client certificate authentication");
println!(" pulzr https://api.example.com --cert client.pem --key client.key -c 10 -d 30");
println!(" ");
println!(" # Client cert with insecure mode");
println!(
" pulzr https://api.example.com --cert client.pem --key client.key --insecure -c 5 -n 100"
);
println!();
println!("🔗 Integration & CI/CD:");
println!(" # CI/CD automation");
println!(" pulzr $API_ENDPOINT --headless -c 5 -n 100 --timeout 10 --output ci_results");
println!(" ");
println!(" # Health check monitoring");
println!(" pulzr https://api.example.com --websocket --headless -c 3 -d 60");
println!();
println!("🌐 Distributed Load Testing:");
println!(" # Start coordinator (on main machine)");
println!(" pulzr --coordinator --coordinator-port 9630");
println!(" ");
println!(" # Start workers (on different machines)");
println!(" pulzr --worker --coordinator-host 192.168.1.100 --worker-id worker-1");
println!(" pulzr --worker --coordinator-host 192.168.1.100 --worker-id worker-2");
println!(" ");
println!(" # Custom worker configuration");
println!(" pulzr --worker --coordinator-host coordinator.example.com \\");
println!(" --worker-max-concurrent 500 --worker-max-rps 1000");
println!();
println!("📋 Alternative Syntax");
println!(" # Positional URL with enhanced flags");
println!(" pulzr -c 25 -n 1000 --latencies --format json https://example.com");
println!(" ");
println!(" # Compatible flag names");
println!(" pulzr https://example.com --connections 50 --rate 100 --timeout 15");
println!();
println!("💡 Pro Tips:");
println!(" • Use --webui for visual monitoring and real-time request logs");
println!(" • Use -n for exact request counts, -d for time-based testing");
println!(" • Use --format json for automation and CI/CD pipelines");
println!(" • Use --random-ua for more realistic traffic simulation");
println!(" • Use --output to export detailed CSV reports");
println!(" • Use --insecure for testing self-signed certificates");
println!(" • Use --cert/--key for client certificate authentication");
println!(" • Use --coordinator to run distributed coordinator");
println!(" • Use --worker to run distributed worker nodes");
println!();
println!("📖 For more examples and documentation:");
println!(" https://github.com/yourusername/pulzr");
}
async fn handle_distributed_mode(cli: &Cli) -> Result<()> {
let stats_collector = Arc::new(StatsCollector::new());
if cli.is_coordinator_mode() {
println!("🎯 Starting Distributed Load Testing Coordinator");
let config = CoordinatorConfig {
coordinator_id: format!("coordinator-{}", uuid::Uuid::new_v4()),
port: cli.coordinator_port,
max_workers: cli.max_workers,
heartbeat_timeout_secs: 30,
heartbeat_check_interval_secs: 10,
auto_balance_load: true,
};
let mut coordinator = DistributedCoordinator::new(config, stats_collector);
let actual_port = coordinator.start().await?;
println!("Coordinator started on port: {}", actual_port);
println!(
"Workers can connect using: --worker --coordinator-host <host>:{}",
actual_port
);
println!("Press Ctrl+C to shutdown");
signal::ctrl_c().await?;
println!("\nShutdown signal received");
} else if cli.is_worker_mode() {
println!("🔧 Starting Distributed Load Testing Worker");
let worker_id = cli
.worker_id
.clone()
.unwrap_or_else(|| format!("worker-{}", uuid::Uuid::new_v4()));
let config = WorkerConfig {
worker_id: worker_id.clone(),
coordinator_host: cli.get_coordinator_host(),
coordinator_port: cli.coordinator_port,
max_concurrent_requests: cli.worker_max_concurrent,
max_rps: cli.worker_max_rps,
heartbeat_interval_secs: 10,
metrics_interval_secs: 5,
connection_retry_interval_secs: 5,
max_connection_retries: 10,
connection_timeout_secs: 30,
};
println!("Worker ID: {}", worker_id);
println!(
"Connecting to coordinator: {}:{}",
config.coordinator_host, config.coordinator_port
);
let mut worker = DistributedWorker::new(config, stats_collector);
worker.start().await?;
} else if cli.is_distributed_client_mode() {
println!("📡 Starting Distributed Load Testing Client");
let coordinator_host = cli.get_coordinator_host();
let _client = DistributedClient::new(&coordinator_host, cli.coordinator_port);
println!(
"Connecting to coordinator: {}:{}",
coordinator_host, cli.coordinator_port
);
println!("Distributed client mode is a basic implementation for demonstration");
println!("Use --coordinator and --worker flags to set up distributed testing");
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
if cli.examples {
print_examples();
std::process::exit(0);
}
if let Err(e) = cli.validate_distributed_config() {
eprintln!("Error: {}", e);
std::process::exit(1);
}
if cli.is_distributed_mode() {
return handle_distributed_mode(&cli).await;
}
if cli.memory_optimize
&& cli.get_url().is_none()
&& cli.scenario.is_none()
&& cli.endpoints.is_none()
{
println!("🔧 Memory Optimization Demo Mode");
println!("This would demonstrate memory optimization features.");
println!(
"Current memory profile: {}",
cli.memory_profile.as_deref().unwrap_or("default")
);
println!("Max results in memory: {}", cli.max_results);
println!("Max result age: {}s", cli.max_result_age);
println!("Auto cleanup: {}", cli.auto_cleanup);
println!("Cleanup interval: {}s", cli.cleanup_interval);
match get_system_memory_info() {
Ok(info) => info.print_summary(),
Err(e) => println!("Could not get system memory info: {}", e),
}
if let Ok(process_mem) = get_process_memory_usage() {
println!("Current process memory: {:.2} MB", process_mem);
}
println!("\n💡 Tips for memory optimization:");
println!(" --memory-profile streaming # For continuous long-running tests");
println!(" --memory-profile low-memory # For memory-constrained environments");
println!(" --memory-profile high-throughput # For high-performance testing");
println!(" --max-results 1000 # Limit memory usage");
println!(" --auto-cleanup # Enable periodic cleanup");
return Ok(());
}
if cli.list_dashboards
|| cli.dashboard_info.is_some()
|| cli.dashboard_import.is_some()
|| cli.dashboard_validate.is_some()
{
let grafana_manager = GrafanaManager::new(cli.dashboards_dir.clone());
if cli.list_dashboards {
println!("📊 Available Grafana Dashboards:");
match grafana_manager.list_dashboards() {
Ok(dashboards) => {
if dashboards.is_empty() {
println!(
" No dashboards found in directory: {}",
cli.dashboards_dir.as_deref().unwrap_or("dashboards")
);
} else {
for dashboard in dashboards {
if let Ok(info) = grafana_manager.get_dashboard_info(&dashboard) {
info.print_summary();
println!();
}
}
}
}
Err(e) => eprintln!("Error listing dashboards: {}", e),
}
return Ok(());
}
if let Some(dashboard_name) = &cli.dashboard_info {
match grafana_manager.get_dashboard_info(dashboard_name) {
Ok(info) => {
info.print_summary();
}
Err(e) => eprintln!("Error getting dashboard info: {}", e),
}
return Ok(());
}
if let Some(dashboard_name) = &cli.dashboard_import {
match grafana_manager.generate_import_instructions(dashboard_name) {
Ok(instructions) => {
println!("{}", instructions);
}
Err(e) => eprintln!("Error generating import instructions: {}", e),
}
return Ok(());
}
if let Some(dashboard_name) = &cli.dashboard_validate {
match grafana_manager.validate_dashboard(dashboard_name) {
Ok(issues) => {
if issues.is_empty() {
println!("✅ Dashboard '{}' is valid", dashboard_name);
} else {
println!("❌ Dashboard '{}' has issues:", dashboard_name);
for issue in issues {
println!(" - {}", issue);
}
}
}
Err(e) => eprintln!("Error validating dashboard: {}", e),
}
return Ok(());
}
}
let scenario = if let Some(scenario_path) = &cli.scenario {
Some(Scenario::load_from_file(scenario_path)?)
} else {
None
};
let endpoints = if let Some(endpoints_path) = &cli.endpoints {
let config = MultiEndpointConfig::load_from_file(endpoints_path)?;
config.validate()?;
Some(config)
} else {
None
};
if cli.get_url().is_none() && scenario.is_none() && endpoints.is_none() {
eprintln!("Error: Either --url, --scenario, or --endpoints must be provided");
std::process::exit(1);
}
let alert_config = {
let mut cfg = AlertConfig::default();
if let Some(v) = cli.alert_error_rate {
cfg.error_rate_threshold = v;
}
if let Some(v) = cli.alert_latency {
cfg.response_time_threshold = v;
}
cfg
};
let make_stats = || Arc::new(StatsCollector::new().with_alert_config(alert_config.clone()));
let (stats_collector, websocket_server, websocket_sender) = if cli.websocket || cli.webui {
let stats_collector = make_stats();
let ws_server = WebSocketServer::new(cli.websocket_port, Arc::clone(&stats_collector));
let sender = ws_server.get_message_sender();
let stats_with_ws = Arc::new(stats_collector.clone_with_websocket_sender(sender.clone()));
(stats_with_ws, Some(ws_server), Some(sender))
} else {
(make_stats(), None, None)
};
let user_agent_manager = Arc::new(UserAgentManager::new(
cli.user_agent.clone(),
cli.random_ua,
cli.ua_file.as_deref(),
)?);
let auth_method = Arc::new(setup_authentication(&cli)?);
let http2_config = Arc::new(create_http2_config(&cli)?);
let tls_config = Arc::new(create_tls_config(&cli)?);
let client = Arc::new(HttpClient::new(
cli.get_url()
.cloned()
.unwrap_or_else(|| "http://placeholder.com".to_string()),
cli.method.to_reqwest_method(),
cli.headers.clone(),
cli.get_body(),
Arc::clone(&user_agent_manager),
Arc::clone(&stats_collector),
cli.timeout.map(Duration::from_secs),
Arc::clone(&auth_method),
Arc::clone(&http2_config),
Arc::clone(&tls_config),
!cli.no_follow_redirects,
cli.retry,
cli.expect_body.clone(),
)?);
let rate_limiter = Arc::new(RequestRateLimiter::new(cli.rps));
let mut load_tester = LoadTester::new(
Arc::clone(&client),
Arc::clone(&rate_limiter),
Arc::clone(&stats_collector),
cli.concurrent,
cli.duration.map(Duration::from_secs),
)
.with_total_requests(cli.requests);
if let Some(ref scenario) = scenario {
load_tester = load_tester.with_scenario(Arc::new(scenario.clone()));
}
if let Some(ref endpoints) = endpoints {
load_tester = load_tester.with_endpoints(Arc::new(endpoints.clone()));
}
if let Some(ramp_duration) = cli.ramp_up {
let ramp_config = RampUpConfig::new(
Duration::from_secs(ramp_duration),
cli.ramp_pattern.clone(),
cli.concurrent,
);
load_tester = load_tester.with_ramp_up(ramp_config);
}
let debug_config = DebugConfig::new(cli.debug, cli.debug_level);
load_tester = load_tester.with_debug(debug_config);
let (actual_websocket_port, actual_webui_port) = if let Some(mut ws_server) = websocket_server {
let last_test_config = ws_server.get_last_test_config();
let ws_port = tokio::spawn(async move {
match ws_server.start().await {
Ok(port) => Some(port),
Err(e) => {
eprintln!("WebSocket server error: {}", e);
None
}
}
})
.await
.unwrap_or(None);
let webui_port = if cli.webui {
let webui_port = cli.webui_port;
tokio::spawn(async move {
match start_web_server(webui_port, ws_port).await {
Ok(port) => Some(port),
Err(e) => {
eprintln!("WebUI server error: {}", e);
None
}
}
})
.await
.unwrap_or(None)
} else {
None
};
let config = TestConfig {
url: cli
.url
.clone()
.unwrap_or_else(|| "Scenario Mode".to_string()),
concurrent_requests: cli.concurrent,
rps: cli.rps,
duration_secs: cli.duration.unwrap_or(0),
total_requests: cli.requests,
method: format!("{:?}", cli.method),
user_agent_mode: if cli.random_ua {
format!("Random ({} agents)", user_agent_manager.get_agent_count())
} else if cli.user_agent.is_some() {
"Custom".to_string()
} else {
"Default (pulzr)".to_string()
},
};
if let Some(sender) = &websocket_sender {
let config_clone = config.clone();
let state = Arc::clone(&last_test_config);
tokio::spawn(async move {
*state.lock().await = Some(config_clone);
});
let message = WebSocketMessage::TestStarted {
timestamp: chrono::Utc::now(),
config,
};
let _ = sender.send(message);
}
(ws_port, webui_port)
} else {
(None, None)
};
let (quit_sender, quit_receiver) = broadcast::channel(1);
let prometheus_handle = if cli.prometheus {
let prometheus_exporter = Arc::new(PrometheusExporter::new(Arc::clone(&stats_collector))?);
let prometheus_server = PrometheusServer::new(cli.prometheus_port, prometheus_exporter);
let quit_rx = quit_sender.subscribe();
Some(tokio::spawn(async move {
if let Err(e) = prometheus_server.start(quit_rx).await {
eprintln!("Prometheus server error: {}", e);
}
}))
} else {
None
};
let tui_handle = if !cli.headless && !cli.webui {
let mut tui_app =
TuiApp::new(Arc::clone(&stats_collector)).with_quit_sender(quit_sender.clone());
Some(tokio::spawn(async move {
if let Err(e) = tui_app.run().await {
eprintln!("TUI error: {}", e);
}
}))
} else {
None
};
let quit_sender_clone = quit_sender.clone();
tokio::spawn(async move {
if (signal::ctrl_c().await).is_ok() {
let _ = quit_sender_clone.send(());
}
});
if should_print_intro(&cli) {
println!("Starting load test...");
if let Some(url) = cli.get_url() {
println!("URL: {}", url);
} else if let Some(scenario) = &scenario {
println!(
"Scenario: {} ({} steps)",
scenario.name,
scenario.steps.len()
);
} else if let Some(endpoints) = &endpoints {
println!(
"Endpoints: {} ({} endpoints)",
endpoints.name,
endpoints.endpoints.len()
);
for endpoint in &endpoints.endpoints {
println!(
" - {}: {} {}",
endpoint.name,
endpoint.get_method(&endpoints.defaults),
endpoint.url
);
}
}
}
if should_print_intro(&cli) {
println!("Method: {:?}", cli.method);
}
if should_print_intro(&cli) {
if let Some(ramp_duration) = cli.ramp_up {
println!(
"Ramp-up: {} pattern over {}s to {} concurrent",
format!("{:?}", cli.ramp_pattern).to_lowercase(),
ramp_duration,
cli.concurrent
);
} else {
println!("Concurrent requests: {}", cli.concurrent);
}
} else {
println!("Concurrent requests: {}", cli.concurrent);
}
if let Some(rps) = cli.rps {
println!("RPS limit: {}", rps);
}
if let Some(total_requests) = cli.requests {
println!("Total requests: {}", total_requests);
} else if let Some(duration) = cli.duration {
println!("Duration: {}s", duration);
} else {
println!("Duration: Until stopped (Ctrl+C or 'q')");
}
println!(
"User-Agent mode: {}",
if cli.random_ua {
format!("Random ({} agents)", user_agent_manager.get_agent_count())
} else if cli.user_agent.is_some() {
"Custom".to_string()
} else {
"Default (pulzr)".to_string()
}
);
let protocol_info = http2_config.get_protocol_info();
println!("Protocol: {}", protocol_info.protocol);
if !protocol_info.features.is_empty() {
println!("Features: {}", protocol_info.features.join(", "));
}
let tls_info = tls_config.get_summary();
println!("TLS Mode: {}", tls_info.mode);
if tls_info.client_cert {
if let Some(cert_path) = &tls_info.cert_path {
println!("Client Certificate: {}", cert_path.display());
}
}
if cli.debug {
println!(
"Debug mode: Level {} ({})",
cli.debug_level,
match cli.debug_level {
1 => "Basic request/response info",
2 => "Include headers",
3 => "Full request/response details",
_ => "Basic request/response info",
}
);
}
if let Some(port) = actual_websocket_port {
println!("WebSocket server: ws://127.0.0.1:{}", port);
}
if let Some(port) = actual_webui_port {
let webui_url = format!("http://127.0.0.1:{}", port);
println!("🌐 WebUI available at: {}", webui_url);
if cli.open_browser {
if let Err(e) = open_browser(&webui_url) {
println!(" Could not auto-open browser: {}", e);
println!(" Please open the above URL manually in your browser");
} else {
println!(" Opening in your default browser...");
}
} else {
println!(" Open the above URL in your browser to view the dashboard");
}
}
if cli.prometheus {
println!(
"📊 Prometheus metrics available at: http://127.0.0.1:{}/metrics",
cli.prometheus_port
);
}
if !cli.headless && !cli.webui {
println!("TUI: Press 'q' or Ctrl+C to quit early");
} else {
println!("Press Ctrl+C to quit early");
}
println!("\nTest running...\n");
if !cli.headless
&& !cli.webui
&& (actual_websocket_port.is_some() || actual_webui_port.is_some())
{
println!("Starting TUI in 3 seconds...");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
if let Some(warmup_secs) = cli.warmup {
if !cli.is_quiet() {
println!("Warming up for {}s...", warmup_secs);
}
let warmup_stats = Arc::new(StatsCollector::new());
let warmup_client = Arc::new(HttpClient::new(
cli.get_url()
.cloned()
.unwrap_or_else(|| "http://placeholder.com".to_string()),
cli.method.to_reqwest_method(),
cli.headers.clone(),
cli.get_body(),
Arc::clone(&user_agent_manager),
Arc::clone(&warmup_stats),
cli.timeout.map(Duration::from_secs),
Arc::clone(&auth_method),
Arc::clone(&http2_config),
Arc::clone(&tls_config),
!cli.no_follow_redirects,
cli.retry,
cli.expect_body.clone(),
)?);
let warmup_limiter = Arc::new(RequestRateLimiter::new(cli.rps));
let warmup_tester = LoadTester::new(
warmup_client,
warmup_limiter,
warmup_stats,
cli.concurrent,
Some(Duration::from_secs(warmup_secs)),
);
let (_, warmup_quit) = broadcast::channel(1);
let _ = warmup_tester.run_test(warmup_quit).await;
if !cli.is_quiet() {
println!("Warmup done, starting test...\n");
}
}
let progress_handle = if cli.headless && !cli.is_quiet() {
let stats_clone = Arc::clone(&stats_collector);
let total_reqs = cli.requests;
let duration_secs = cli.duration;
let test_start = std::time::Instant::now();
Some(tokio::spawn(async move {
use std::io::Write;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let m = stats_clone.get_live_metrics().await;
let elapsed = test_start.elapsed().as_secs();
let progress = if let Some(total) = total_reqs {
format!("{}/{}", m.requests_sent, total)
} else if let Some(d) = duration_secs {
format!("{}s/{}s", elapsed, d)
} else {
format!("{}s", elapsed)
};
let ok = m.requests_completed.saturating_sub(m.requests_failed);
let pct = if m.requests_completed > 0 {
ok as f64 / m.requests_completed as f64 * 100.0
} else {
100.0
};
print!(
"\r [{}] {} reqs | {:.1} rps | avg {:.0}ms | {:.1}% ok ",
progress, m.requests_sent, m.current_rps, m.avg_response_time, pct
);
let _ = std::io::stdout().flush();
}
}))
} else {
None
};
let fail_on_error_handle = if let Some(threshold) = cli.fail_on_error_rate {
let stats_clone = Arc::clone(&stats_collector);
let quit_tx = quit_sender.clone();
Some(tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let m = stats_clone.get_live_metrics().await;
if m.requests_completed >= 10 {
let err_rate = m.requests_failed as f64 / m.requests_completed as f64 * 100.0;
if err_rate > threshold {
eprintln!(
"\nError rate {:.1}% exceeded threshold {:.1}% -- aborting",
err_rate, threshold
);
let _ = quit_tx.send(());
break;
}
}
}
}))
} else {
None
};
if let Err(e) = load_tester.run_test(quit_receiver).await {
eprintln!("Load test error: {}", e);
}
if let Some(h) = progress_handle {
h.abort();
}
if let Some(h) = fail_on_error_handle {
h.abort();
}
if cli.headless && !cli.is_quiet() {
println!();
}
if let Some(handle) = tui_handle {
handle.abort();
}
if let Some(handle) = prometheus_handle {
handle.abort();
}
let final_summary = stats_collector.get_final_summary().await;
if let Some(sender) = &websocket_sender {
let message = WebSocketMessage::TestCompleted {
timestamp: chrono::Utc::now(),
summary: final_summary.clone(),
};
let _ = sender.send(message);
}
let exporter = CsvExporter::new(Arc::clone(&stats_collector));
if let Some(ref path) = cli.output_file {
match std::fs::File::create(path) {
Ok(file) => {
if let Err(e) = serde_json::to_writer_pretty(file, &final_summary) {
eprintln!("Failed to write output file: {}", e);
} else if !cli.is_quiet() {
println!("Summary written to: {}", path.display());
}
}
Err(e) => eprintln!("Failed to create output file: {}", e),
}
}
if let Some(ref path) = cli.save_baseline {
match std::fs::File::create(path) {
Ok(file) => {
if let Err(e) = serde_json::to_writer_pretty(file, &final_summary) {
eprintln!("Failed to write baseline: {}", e);
} else if !cli.is_quiet() {
println!("Baseline saved to: {}", path.display());
}
}
Err(e) => eprintln!("Failed to create baseline file: {}", e),
}
}
let mut baseline_failed = false;
if let Some(ref path) = cli.compare_baseline {
match std::fs::read_to_string(path) {
Ok(content) => match serde_json::from_str::<FinalSummary>(&content) {
Ok(baseline) => {
baseline_failed = compare_baseline(&baseline, &final_summary);
}
Err(e) => eprintln!("Failed to parse baseline: {}", e),
},
Err(e) => eprintln!("Failed to read baseline file: {}", e),
}
}
if cli.json_output {
exporter.print_json_summary(&final_summary);
}
if should_print_result(&cli) && !cli.json_output {
let target_url = cli
.get_url()
.map(|s| s.as_str())
.unwrap_or("multiple endpoints");
let duration_val = cli.duration.map(|d| d as f64);
if let Some(format) = &cli.format {
match format {
OutputFormatExtended::PlainText => {
if cli.latencies {
exporter.print_compact_summary(
&final_summary,
target_url,
cli.concurrent,
duration_val,
);
} else {
exporter.print_summary(&final_summary);
}
}
OutputFormatExtended::Json => {
exporter.print_json_summary(&final_summary);
}
}
} else {
match cli.output_format {
OutputFormat::Detailed => {
exporter.print_summary(&final_summary);
}
OutputFormat::Compact => {
exporter.print_compact_summary(
&final_summary,
target_url,
cli.concurrent,
duration_val,
);
}
OutputFormat::Minimal => {
exporter.print_minimal_summary(&final_summary);
}
}
}
} else if !cli.is_quiet() && !cli.json_output {
exporter.print_minimal_summary(&final_summary);
}
if let Some(ref output_path) = cli.output {
if !cli.is_quiet() {
println!("\nExporting results to CSV...");
}
let detailed_path = output_path.with_extension("detailed.csv");
let summary_path = output_path.with_extension("summary.csv");
if let Err(e) = exporter.export_detailed_results(&detailed_path).await {
eprintln!("Failed to export detailed results: {}", e);
} else if !cli.is_quiet() {
println!("Detailed results: {}", detailed_path.display());
}
if let Err(e) = exporter.export_summary(&summary_path).await {
eprintln!("Failed to export summary: {}", e);
} else if !cli.is_quiet() {
println!("Summary: {}", summary_path.display());
}
}
if should_print_result(&cli) {
println!("\nLoad test completed!");
}
if baseline_failed
|| cli.fail_on_error_rate.is_some_and(|_| {
let total = final_summary.total_requests;
let failed = final_summary.failed_requests;
if total > 0 {
let rate = failed as f64 / total as f64 * 100.0;
rate > cli.fail_on_error_rate.unwrap()
} else {
false
}
})
{
std::process::exit(1);
}
Ok(())
}
fn compare_baseline(baseline: &FinalSummary, current: &FinalSummary) -> bool {
const REGRESSION_PCT: f64 = 20.0;
let mut failed = false;
let checks: &[(&str, f64, f64)] = &[
(
"avg_response_ms",
baseline.avg_response_time,
current.avg_response_time,
),
(
"p99_response_ms",
baseline.p99_response_time as f64,
current.p99_response_time as f64,
),
(
"error_rate_%",
{
if baseline.total_requests > 0 {
baseline.failed_requests as f64 / baseline.total_requests as f64 * 100.0
} else {
0.0
}
},
{
if current.total_requests > 0 {
current.failed_requests as f64 / current.total_requests as f64 * 100.0
} else {
0.0
}
},
),
];
println!("\nBaseline comparison:");
println!(
" {:<22} {:>12} {:>12} {:>10}",
"metric", "baseline", "current", "change"
);
println!(" {}", "-".repeat(60));
for (name, base_val, cur_val) in checks {
let change = if *base_val > 0.0 {
(cur_val - base_val) / base_val * 100.0
} else {
0.0
};
let regressed = change > REGRESSION_PCT;
let marker = if regressed { " REGRESSED" } else { "" };
println!(
" {:<22} {:>12.1} {:>12.1} {:>+9.1}%{}",
name, base_val, cur_val, change, marker
);
if regressed {
failed = true;
}
}
println!();
if failed {
eprintln!(
"Baseline regression detected (threshold: +{:.0}%)",
REGRESSION_PCT
);
}
failed
}