use crate::error::{BenchError, Result};
use indicatif::{ProgressBar, ProgressStyle};
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
fn extract_exchange_json(line: &str) -> Option<String> {
let marker = "MOCKFORGE_EXCHANGE:";
let start = line.find(marker)?;
let json_start = start + marker.len();
let json_str = &line[json_start..];
let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
if json_str.is_empty() {
return None;
}
if json_str.starts_with('{') && json_str.contains("\\\"") {
Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
} else {
Some(json_str.to_string())
}
}
fn extract_failure_json(line: &str) -> Option<String> {
let marker = "MOCKFORGE_FAILURE:";
let start = line.find(marker)?;
let json_start = start + marker.len();
let json_str = &line[json_start..];
let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
if json_str.is_empty() {
return None;
}
if json_str.starts_with('{') && json_str.contains("\\\"") {
Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
} else {
Some(json_str.to_string())
}
}
pub struct K6Executor {
k6_path: String,
}
impl K6Executor {
pub fn new() -> Result<Self> {
let k6_path = which::which("k6")
.map_err(|_| BenchError::K6NotFound)?
.to_string_lossy()
.to_string();
Ok(Self { k6_path })
}
pub fn is_k6_installed() -> bool {
which::which("k6").is_ok()
}
pub async fn get_version(&self) -> Result<String> {
let output = TokioCommand::new(&self.k6_path)
.arg("version")
.output()
.await
.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
if !output.status.success() {
return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
pub async fn execute(
&self,
script_path: &Path,
output_dir: Option<&Path>,
verbose: bool,
) -> Result<K6Results> {
self.execute_with_port(script_path, output_dir, verbose, None).await
}
pub async fn execute_with_port(
&self,
script_path: &Path,
output_dir: Option<&Path>,
verbose: bool,
api_port: Option<u16>,
) -> Result<K6Results> {
println!("Starting load test...\n");
let mut cmd = TokioCommand::new(&self.k6_path);
cmd.arg("run");
if let Some(port) = api_port {
cmd.arg("--address").arg(format!("localhost:{}", port));
}
if verbose {
cmd.arg("--verbose");
}
let abs_script =
std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
cmd.arg(&abs_script);
if let Some(dir) = output_dir {
cmd.current_dir(dir);
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
let stdout_reader = BufReader::new(stdout);
let stderr_reader = BufReader::new(stderr);
let mut stdout_lines = stdout_reader.lines();
let mut stderr_lines = stderr_reader.lines();
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
);
spinner.set_message("Running load test...");
let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let fd_stdout = Arc::clone(&failure_details);
let fd_stderr = Arc::clone(&failure_details);
let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let ex_stdout = Arc::clone(&exchange_details);
let ex_stderr = Arc::clone(&exchange_details);
let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
Arc::new(tokio::sync::Mutex::new(Vec::new()));
let log_stdout = Arc::clone(&log_lines);
let log_stderr = Arc::clone(&log_lines);
let stdout_handle = tokio::spawn(async move {
while let Ok(Some(line)) = stdout_lines.next_line().await {
log_stdout.lock().await.push(format!("[stdout] {}", line));
if let Some(json_str) = extract_failure_json(&line) {
fd_stdout.lock().await.push(json_str);
} else if let Some(json_str) = extract_exchange_json(&line) {
ex_stdout.lock().await.push(json_str);
} else {
spinner.set_message(line.clone());
if !line.is_empty() && !line.contains("running") && !line.contains("default") {
println!("{}", line);
}
}
}
spinner.finish_and_clear();
});
let stderr_handle = tokio::spawn(async move {
while let Ok(Some(line)) = stderr_lines.next_line().await {
if !line.is_empty() {
log_stderr.lock().await.push(format!("[stderr] {}", line));
if let Some(json_str) = extract_failure_json(&line) {
fd_stderr.lock().await.push(json_str);
} else if let Some(json_str) = extract_exchange_json(&line) {
ex_stderr.lock().await.push(json_str);
} else {
eprintln!("{}", line);
}
}
}
});
let status =
child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
let _ = stdout_handle.await;
let _ = stderr_handle.await;
let exit_code = status.code().unwrap_or(-1);
if !status.success() && exit_code != 99 {
return Err(BenchError::K6ExecutionFailed(format!(
"k6 exited with status: {}",
status
)));
}
if exit_code == 99 {
tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
}
if let Some(dir) = output_dir {
let details = failure_details.lock().await;
if !details.is_empty() {
let failure_path = dir.join("conformance-failure-details.json");
let parsed: Vec<serde_json::Value> =
details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
if let Ok(json) = serde_json::to_string_pretty(&parsed) {
let _ = std::fs::write(&failure_path, json);
}
}
let exchanges = exchange_details.lock().await;
if !exchanges.is_empty() {
let exchange_path = dir.join("conformance-requests.json");
let parsed: Vec<serde_json::Value> =
exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
if let Ok(json) = serde_json::to_string_pretty(&parsed) {
let _ = std::fs::write(&exchange_path, json);
tracing::info!(
"Exported {} request/response pairs to {}",
parsed.len(),
exchange_path.display()
);
}
}
let lines = log_lines.lock().await;
if !lines.is_empty() {
let log_path = dir.join("k6-output.log");
let _ = std::fs::write(&log_path, lines.join("\n"));
println!("k6 output log saved to: {}", log_path.display());
}
}
let results = if let Some(dir) = output_dir {
Self::parse_results(dir)?
} else {
K6Results::default()
};
Ok(results)
}
fn parse_results(output_dir: &Path) -> Result<K6Results> {
let summary_path = output_dir.join("summary.json");
if !summary_path.exists() {
return Ok(K6Results::default());
}
let content = std::fs::read_to_string(summary_path)
.map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
let json: serde_json::Value = serde_json::from_str(&content)
.map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
let duration_values = &json["metrics"]["http_req_duration"]["values"];
Ok(K6Results {
total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
.as_u64()
.unwrap_or(0),
avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
})
}
}
impl Default for K6Executor {
fn default() -> Self {
Self::new().expect("k6 not found")
}
}
#[derive(Debug, Clone, Default)]
pub struct K6Results {
pub total_requests: u64,
pub failed_requests: u64,
pub avg_duration_ms: f64,
pub p95_duration_ms: f64,
pub p99_duration_ms: f64,
pub rps: f64,
pub vus_max: u32,
pub min_duration_ms: f64,
pub max_duration_ms: f64,
pub med_duration_ms: f64,
pub p90_duration_ms: f64,
}
impl K6Results {
pub fn error_rate(&self) -> f64 {
if self.total_requests == 0 {
return 0.0;
}
(self.failed_requests as f64 / self.total_requests as f64) * 100.0
}
pub fn success_rate(&self) -> f64 {
100.0 - self.error_rate()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_k6_results_error_rate() {
let results = K6Results {
total_requests: 100,
failed_requests: 5,
avg_duration_ms: 100.0,
p95_duration_ms: 200.0,
p99_duration_ms: 300.0,
..Default::default()
};
assert_eq!(results.error_rate(), 5.0);
assert_eq!(results.success_rate(), 95.0);
}
#[test]
fn test_k6_results_zero_requests() {
let results = K6Results::default();
assert_eq!(results.error_rate(), 0.0);
}
#[test]
fn test_extract_failure_json_raw() {
let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
let result = extract_failure_json(line).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["check"], "test");
}
#[test]
fn test_extract_failure_json_logfmt() {
let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
let result = extract_failure_json(line).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
assert_eq!(parsed["check"], "test");
assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
}
#[test]
fn test_extract_failure_json_no_marker() {
assert!(extract_failure_json("just a regular log line").is_none());
}
}