mockforge_bench/
executor.rs1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_failure_json(line: &str) -> Option<String> {
17 let marker = "MOCKFORGE_FAILURE:";
18 let start = line.find(marker)?;
19 let json_start = start + marker.len();
20 let json_str = &line[json_start..];
21 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
23 if json_str.is_empty() {
24 return None;
25 }
26 if json_str.starts_with('{') && json_str.contains("\\\"") {
30 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
31 } else {
32 Some(json_str.to_string())
33 }
34}
35
36pub struct K6Executor {
38 k6_path: String,
39}
40
41impl K6Executor {
42 pub fn new() -> Result<Self> {
44 let k6_path = which::which("k6")
45 .map_err(|_| BenchError::K6NotFound)?
46 .to_string_lossy()
47 .to_string();
48
49 Ok(Self { k6_path })
50 }
51
52 pub fn is_k6_installed() -> bool {
54 which::which("k6").is_ok()
55 }
56
57 pub async fn get_version(&self) -> Result<String> {
59 let output = TokioCommand::new(&self.k6_path)
60 .arg("version")
61 .output()
62 .await
63 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
64
65 if !output.status.success() {
66 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
67 }
68
69 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
70 }
71
72 pub async fn execute(
79 &self,
80 script_path: &Path,
81 output_dir: Option<&Path>,
82 verbose: bool,
83 ) -> Result<K6Results> {
84 self.execute_with_port(script_path, output_dir, verbose, None).await
85 }
86
87 pub async fn execute_with_port(
89 &self,
90 script_path: &Path,
91 output_dir: Option<&Path>,
92 verbose: bool,
93 api_port: Option<u16>,
94 ) -> Result<K6Results> {
95 println!("Starting load test...\n");
96
97 let mut cmd = TokioCommand::new(&self.k6_path);
98 cmd.arg("run");
99
100 if let Some(port) = api_port {
103 cmd.arg("--address").arg(format!("localhost:{}", port));
104 }
105
106 if let Some(dir) = output_dir {
108 let summary_path = dir.join("summary.json");
109 cmd.arg("--summary-export").arg(summary_path);
110 }
111
112 if verbose {
114 cmd.arg("--verbose");
115 }
116
117 let abs_script =
119 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
120 cmd.arg(&abs_script);
121
122 if let Some(dir) = output_dir {
125 cmd.current_dir(dir);
126 }
127
128 cmd.stdout(Stdio::piped());
129 cmd.stderr(Stdio::piped());
130
131 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
132
133 let stdout = child
134 .stdout
135 .take()
136 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
137
138 let stderr = child
139 .stderr
140 .take()
141 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
142
143 let stdout_reader = BufReader::new(stdout);
145 let stderr_reader = BufReader::new(stderr);
146
147 let mut stdout_lines = stdout_reader.lines();
148 let mut stderr_lines = stderr_reader.lines();
149
150 let spinner = ProgressBar::new_spinner();
152 spinner.set_style(
153 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
154 );
155 spinner.set_message("Running load test...");
156
157 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
160 Arc::new(tokio::sync::Mutex::new(Vec::new()));
161 let fd_stdout = Arc::clone(&failure_details);
162 let fd_stderr = Arc::clone(&failure_details);
163
164 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
166 Arc::new(tokio::sync::Mutex::new(Vec::new()));
167 let log_stdout = Arc::clone(&log_lines);
168 let log_stderr = Arc::clone(&log_lines);
169
170 let stdout_handle = tokio::spawn(async move {
172 while let Ok(Some(line)) = stdout_lines.next_line().await {
173 log_stdout.lock().await.push(format!("[stdout] {}", line));
174 if let Some(json_str) = extract_failure_json(&line) {
175 fd_stdout.lock().await.push(json_str);
176 } else {
177 spinner.set_message(line.clone());
178 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
179 println!("{}", line);
180 }
181 }
182 }
183 spinner.finish_and_clear();
184 });
185
186 let stderr_handle = tokio::spawn(async move {
188 while let Ok(Some(line)) = stderr_lines.next_line().await {
189 if !line.is_empty() {
190 log_stderr.lock().await.push(format!("[stderr] {}", line));
191 if let Some(json_str) = extract_failure_json(&line) {
192 fd_stderr.lock().await.push(json_str);
193 } else {
194 eprintln!("{}", line);
195 }
196 }
197 }
198 });
199
200 let status =
202 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
203
204 let _ = stdout_handle.await;
206 let _ = stderr_handle.await;
207
208 let exit_code = status.code().unwrap_or(-1);
211 if !status.success() && exit_code != 99 {
212 return Err(BenchError::K6ExecutionFailed(format!(
213 "k6 exited with status: {}",
214 status
215 )));
216 }
217 if exit_code == 99 {
218 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
219 }
220
221 if let Some(dir) = output_dir {
223 let details = failure_details.lock().await;
224 if !details.is_empty() {
225 let failure_path = dir.join("conformance-failure-details.json");
226 let parsed: Vec<serde_json::Value> =
227 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
228 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
229 let _ = std::fs::write(&failure_path, json);
230 }
231 }
232
233 let lines = log_lines.lock().await;
235 if !lines.is_empty() {
236 let log_path = dir.join("k6-output.log");
237 let _ = std::fs::write(&log_path, lines.join("\n"));
238 println!("k6 output log saved to: {}", log_path.display());
239 }
240 }
241
242 let results = if let Some(dir) = output_dir {
244 Self::parse_results(dir)?
245 } else {
246 K6Results::default()
247 };
248
249 Ok(results)
250 }
251
252 fn parse_results(output_dir: &Path) -> Result<K6Results> {
254 let summary_path = output_dir.join("summary.json");
255
256 if !summary_path.exists() {
257 return Ok(K6Results::default());
258 }
259
260 let content = std::fs::read_to_string(summary_path)
261 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
262
263 let json: serde_json::Value = serde_json::from_str(&content)
264 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
265
266 let duration_values = &json["metrics"]["http_req_duration"]["values"];
267
268 Ok(K6Results {
269 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
270 failed_requests: json["metrics"]["http_req_failed"]["values"]["fails"]
271 .as_u64()
272 .unwrap_or(0),
273 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
274 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
275 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
276 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
277 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
278 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
279 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
280 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
281 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
282 })
283 }
284}
285
286impl Default for K6Executor {
287 fn default() -> Self {
288 Self::new().expect("k6 not found")
289 }
290}
291
292#[derive(Debug, Clone, Default)]
294pub struct K6Results {
295 pub total_requests: u64,
296 pub failed_requests: u64,
297 pub avg_duration_ms: f64,
298 pub p95_duration_ms: f64,
299 pub p99_duration_ms: f64,
300 pub rps: f64,
301 pub vus_max: u32,
302 pub min_duration_ms: f64,
303 pub max_duration_ms: f64,
304 pub med_duration_ms: f64,
305 pub p90_duration_ms: f64,
306}
307
308impl K6Results {
309 pub fn error_rate(&self) -> f64 {
311 if self.total_requests == 0 {
312 return 0.0;
313 }
314 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
315 }
316
317 pub fn success_rate(&self) -> f64 {
319 100.0 - self.error_rate()
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_k6_results_error_rate() {
329 let results = K6Results {
330 total_requests: 100,
331 failed_requests: 5,
332 avg_duration_ms: 100.0,
333 p95_duration_ms: 200.0,
334 p99_duration_ms: 300.0,
335 ..Default::default()
336 };
337
338 assert_eq!(results.error_rate(), 5.0);
339 assert_eq!(results.success_rate(), 95.0);
340 }
341
342 #[test]
343 fn test_k6_results_zero_requests() {
344 let results = K6Results::default();
345 assert_eq!(results.error_rate(), 0.0);
346 }
347
348 #[test]
349 fn test_extract_failure_json_raw() {
350 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
351 let result = extract_failure_json(line).unwrap();
352 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
353 assert_eq!(parsed["check"], "test");
354 }
355
356 #[test]
357 fn test_extract_failure_json_logfmt() {
358 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
359 let result = extract_failure_json(line).unwrap();
360 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
361 assert_eq!(parsed["check"], "test");
362 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
363 }
364
365 #[test]
366 fn test_extract_failure_json_no_marker() {
367 assert!(extract_failure_json("just a regular log line").is_none());
368 }
369}