mockforge_bench/
executor.rs1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_failure_json(line: &str) -> Option<String> {
17 let marker = "MOCKFORGE_FAILURE:";
18 let start = line.find(marker)?;
19 let json_start = start + marker.len();
20 let json_str = &line[json_start..];
21 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
23 if json_str.is_empty() {
24 return None;
25 }
26 if json_str.starts_with('{') && json_str.contains("\\\"") {
30 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
31 } else {
32 Some(json_str.to_string())
33 }
34}
35
36pub struct K6Executor {
38 k6_path: String,
39}
40
41impl K6Executor {
42 pub fn new() -> Result<Self> {
44 let k6_path = which::which("k6")
45 .map_err(|_| BenchError::K6NotFound)?
46 .to_string_lossy()
47 .to_string();
48
49 Ok(Self { k6_path })
50 }
51
52 pub fn is_k6_installed() -> bool {
54 which::which("k6").is_ok()
55 }
56
57 pub async fn get_version(&self) -> Result<String> {
59 let output = TokioCommand::new(&self.k6_path)
60 .arg("version")
61 .output()
62 .await
63 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
64
65 if !output.status.success() {
66 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
67 }
68
69 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
70 }
71
72 pub async fn execute(
79 &self,
80 script_path: &Path,
81 output_dir: Option<&Path>,
82 verbose: bool,
83 ) -> Result<K6Results> {
84 self.execute_with_port(script_path, output_dir, verbose, None).await
85 }
86
87 pub async fn execute_with_port(
89 &self,
90 script_path: &Path,
91 output_dir: Option<&Path>,
92 verbose: bool,
93 api_port: Option<u16>,
94 ) -> Result<K6Results> {
95 println!("Starting load test...\n");
96
97 let mut cmd = TokioCommand::new(&self.k6_path);
98 cmd.arg("run");
99
100 if let Some(port) = api_port {
103 cmd.arg("--address").arg(format!("localhost:{}", port));
104 }
105
106 if verbose {
113 cmd.arg("--verbose");
114 }
115
116 let abs_script =
118 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
119 cmd.arg(&abs_script);
120
121 if let Some(dir) = output_dir {
124 cmd.current_dir(dir);
125 }
126
127 cmd.stdout(Stdio::piped());
128 cmd.stderr(Stdio::piped());
129
130 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
131
132 let stdout = child
133 .stdout
134 .take()
135 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
136
137 let stderr = child
138 .stderr
139 .take()
140 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
141
142 let stdout_reader = BufReader::new(stdout);
144 let stderr_reader = BufReader::new(stderr);
145
146 let mut stdout_lines = stdout_reader.lines();
147 let mut stderr_lines = stderr_reader.lines();
148
149 let spinner = ProgressBar::new_spinner();
151 spinner.set_style(
152 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
153 );
154 spinner.set_message("Running load test...");
155
156 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
159 Arc::new(tokio::sync::Mutex::new(Vec::new()));
160 let fd_stdout = Arc::clone(&failure_details);
161 let fd_stderr = Arc::clone(&failure_details);
162
163 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
165 Arc::new(tokio::sync::Mutex::new(Vec::new()));
166 let log_stdout = Arc::clone(&log_lines);
167 let log_stderr = Arc::clone(&log_lines);
168
169 let stdout_handle = tokio::spawn(async move {
171 while let Ok(Some(line)) = stdout_lines.next_line().await {
172 log_stdout.lock().await.push(format!("[stdout] {}", line));
173 if let Some(json_str) = extract_failure_json(&line) {
174 fd_stdout.lock().await.push(json_str);
175 } else {
176 spinner.set_message(line.clone());
177 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
178 println!("{}", line);
179 }
180 }
181 }
182 spinner.finish_and_clear();
183 });
184
185 let stderr_handle = tokio::spawn(async move {
187 while let Ok(Some(line)) = stderr_lines.next_line().await {
188 if !line.is_empty() {
189 log_stderr.lock().await.push(format!("[stderr] {}", line));
190 if let Some(json_str) = extract_failure_json(&line) {
191 fd_stderr.lock().await.push(json_str);
192 } else {
193 eprintln!("{}", line);
194 }
195 }
196 }
197 });
198
199 let status =
201 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
202
203 let _ = stdout_handle.await;
205 let _ = stderr_handle.await;
206
207 let exit_code = status.code().unwrap_or(-1);
210 if !status.success() && exit_code != 99 {
211 return Err(BenchError::K6ExecutionFailed(format!(
212 "k6 exited with status: {}",
213 status
214 )));
215 }
216 if exit_code == 99 {
217 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
218 }
219
220 if let Some(dir) = output_dir {
222 let details = failure_details.lock().await;
223 if !details.is_empty() {
224 let failure_path = dir.join("conformance-failure-details.json");
225 let parsed: Vec<serde_json::Value> =
226 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
227 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
228 let _ = std::fs::write(&failure_path, json);
229 }
230 }
231
232 let lines = log_lines.lock().await;
234 if !lines.is_empty() {
235 let log_path = dir.join("k6-output.log");
236 let _ = std::fs::write(&log_path, lines.join("\n"));
237 println!("k6 output log saved to: {}", log_path.display());
238 }
239 }
240
241 let results = if let Some(dir) = output_dir {
243 Self::parse_results(dir)?
244 } else {
245 K6Results::default()
246 };
247
248 Ok(results)
249 }
250
251 fn parse_results(output_dir: &Path) -> Result<K6Results> {
253 let summary_path = output_dir.join("summary.json");
254
255 if !summary_path.exists() {
256 return Ok(K6Results::default());
257 }
258
259 let content = std::fs::read_to_string(summary_path)
260 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
261
262 let json: serde_json::Value = serde_json::from_str(&content)
263 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
264
265 let duration_values = &json["metrics"]["http_req_duration"]["values"];
266
267 Ok(K6Results {
268 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
269 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
273 .as_u64()
274 .unwrap_or(0),
275 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
276 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
277 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
278 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
279 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
280 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
281 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
282 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
283 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
284 })
285 }
286}
287
288impl Default for K6Executor {
289 fn default() -> Self {
290 Self::new().expect("k6 not found")
291 }
292}
293
294#[derive(Debug, Clone, Default)]
296pub struct K6Results {
297 pub total_requests: u64,
298 pub failed_requests: u64,
299 pub avg_duration_ms: f64,
300 pub p95_duration_ms: f64,
301 pub p99_duration_ms: f64,
302 pub rps: f64,
303 pub vus_max: u32,
304 pub min_duration_ms: f64,
305 pub max_duration_ms: f64,
306 pub med_duration_ms: f64,
307 pub p90_duration_ms: f64,
308}
309
310impl K6Results {
311 pub fn error_rate(&self) -> f64 {
313 if self.total_requests == 0 {
314 return 0.0;
315 }
316 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
317 }
318
319 pub fn success_rate(&self) -> f64 {
321 100.0 - self.error_rate()
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[test]
330 fn test_k6_results_error_rate() {
331 let results = K6Results {
332 total_requests: 100,
333 failed_requests: 5,
334 avg_duration_ms: 100.0,
335 p95_duration_ms: 200.0,
336 p99_duration_ms: 300.0,
337 ..Default::default()
338 };
339
340 assert_eq!(results.error_rate(), 5.0);
341 assert_eq!(results.success_rate(), 95.0);
342 }
343
344 #[test]
345 fn test_k6_results_zero_requests() {
346 let results = K6Results::default();
347 assert_eq!(results.error_rate(), 0.0);
348 }
349
350 #[test]
351 fn test_extract_failure_json_raw() {
352 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
353 let result = extract_failure_json(line).unwrap();
354 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
355 assert_eq!(parsed["check"], "test");
356 }
357
358 #[test]
359 fn test_extract_failure_json_logfmt() {
360 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
361 let result = extract_failure_json(line).unwrap();
362 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
363 assert_eq!(parsed["check"], "test");
364 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
365 }
366
367 #[test]
368 fn test_extract_failure_json_no_marker() {
369 assert!(extract_failure_json("just a regular log line").is_none());
370 }
371}