mockforge_bench/
executor.rs1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_failure_json(line: &str) -> Option<String> {
17 let marker = "MOCKFORGE_FAILURE:";
18 let start = line.find(marker)?;
19 let json_start = start + marker.len();
20 let json_str = &line[json_start..];
21 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
23 if json_str.is_empty() {
24 return None;
25 }
26 if json_str.starts_with('{') && json_str.contains("\\\"") {
30 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
31 } else {
32 Some(json_str.to_string())
33 }
34}
35
36pub struct K6Executor {
38 k6_path: String,
39}
40
41impl K6Executor {
42 pub fn new() -> Result<Self> {
44 let k6_path = which::which("k6")
45 .map_err(|_| BenchError::K6NotFound)?
46 .to_string_lossy()
47 .to_string();
48
49 Ok(Self { k6_path })
50 }
51
52 pub fn is_k6_installed() -> bool {
54 which::which("k6").is_ok()
55 }
56
57 pub async fn get_version(&self) -> Result<String> {
59 let output = TokioCommand::new(&self.k6_path)
60 .arg("version")
61 .output()
62 .await
63 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
64
65 if !output.status.success() {
66 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
67 }
68
69 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
70 }
71
72 pub async fn execute(
79 &self,
80 script_path: &Path,
81 output_dir: Option<&Path>,
82 verbose: bool,
83 ) -> Result<K6Results> {
84 self.execute_with_port(script_path, output_dir, verbose, None).await
85 }
86
87 pub async fn execute_with_port(
89 &self,
90 script_path: &Path,
91 output_dir: Option<&Path>,
92 verbose: bool,
93 api_port: Option<u16>,
94 ) -> Result<K6Results> {
95 println!("Starting load test...\n");
96
97 let mut cmd = TokioCommand::new(&self.k6_path);
98 cmd.arg("run");
99
100 if let Some(port) = api_port {
103 cmd.arg("--address").arg(format!("localhost:{}", port));
104 }
105
106 if let Some(dir) = output_dir {
108 let summary_path = dir.join("summary.json");
109 cmd.arg("--summary-export").arg(summary_path);
110 }
111
112 if verbose {
114 cmd.arg("--verbose");
115 }
116
117 cmd.arg(script_path);
118
119 if let Some(dir) = output_dir {
122 cmd.current_dir(dir);
123 }
124
125 cmd.stdout(Stdio::piped());
126 cmd.stderr(Stdio::piped());
127
128 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
129
130 let stdout = child
131 .stdout
132 .take()
133 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
134
135 let stderr = child
136 .stderr
137 .take()
138 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
139
140 let stdout_reader = BufReader::new(stdout);
142 let stderr_reader = BufReader::new(stderr);
143
144 let mut stdout_lines = stdout_reader.lines();
145 let mut stderr_lines = stderr_reader.lines();
146
147 let spinner = ProgressBar::new_spinner();
149 spinner.set_style(
150 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
151 );
152 spinner.set_message("Running load test...");
153
154 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
157 Arc::new(tokio::sync::Mutex::new(Vec::new()));
158 let fd_stdout = Arc::clone(&failure_details);
159 let fd_stderr = Arc::clone(&failure_details);
160
161 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
163 Arc::new(tokio::sync::Mutex::new(Vec::new()));
164 let log_stdout = Arc::clone(&log_lines);
165 let log_stderr = Arc::clone(&log_lines);
166
167 let stdout_handle = tokio::spawn(async move {
169 while let Ok(Some(line)) = stdout_lines.next_line().await {
170 log_stdout.lock().await.push(format!("[stdout] {}", line));
171 if let Some(json_str) = extract_failure_json(&line) {
172 fd_stdout.lock().await.push(json_str);
173 } else {
174 spinner.set_message(line.clone());
175 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
176 println!("{}", line);
177 }
178 }
179 }
180 spinner.finish_and_clear();
181 });
182
183 let stderr_handle = tokio::spawn(async move {
185 while let Ok(Some(line)) = stderr_lines.next_line().await {
186 if !line.is_empty() {
187 log_stderr.lock().await.push(format!("[stderr] {}", line));
188 if let Some(json_str) = extract_failure_json(&line) {
189 fd_stderr.lock().await.push(json_str);
190 } else {
191 eprintln!("{}", line);
192 }
193 }
194 }
195 });
196
197 let status =
199 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
200
201 let _ = stdout_handle.await;
203 let _ = stderr_handle.await;
204
205 let exit_code = status.code().unwrap_or(-1);
208 if !status.success() && exit_code != 99 {
209 return Err(BenchError::K6ExecutionFailed(format!(
210 "k6 exited with status: {}",
211 status
212 )));
213 }
214 if exit_code == 99 {
215 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
216 }
217
218 if let Some(dir) = output_dir {
220 let details = failure_details.lock().await;
221 if !details.is_empty() {
222 let failure_path = dir.join("conformance-failure-details.json");
223 let parsed: Vec<serde_json::Value> =
224 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
225 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
226 let _ = std::fs::write(&failure_path, json);
227 }
228 }
229
230 let lines = log_lines.lock().await;
232 if !lines.is_empty() {
233 let log_path = dir.join("k6-output.log");
234 let _ = std::fs::write(&log_path, lines.join("\n"));
235 println!("k6 output log saved to: {}", log_path.display());
236 }
237 }
238
239 let results = if let Some(dir) = output_dir {
241 Self::parse_results(dir)?
242 } else {
243 K6Results::default()
244 };
245
246 Ok(results)
247 }
248
249 fn parse_results(output_dir: &Path) -> Result<K6Results> {
251 let summary_path = output_dir.join("summary.json");
252
253 if !summary_path.exists() {
254 return Ok(K6Results::default());
255 }
256
257 let content = std::fs::read_to_string(summary_path)
258 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
259
260 let json: serde_json::Value = serde_json::from_str(&content)
261 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
262
263 let duration_values = &json["metrics"]["http_req_duration"]["values"];
264
265 Ok(K6Results {
266 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
267 failed_requests: json["metrics"]["http_req_failed"]["values"]["fails"]
268 .as_u64()
269 .unwrap_or(0),
270 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
271 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
272 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
273 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
274 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
275 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
276 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
277 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
278 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
279 })
280 }
281}
282
283impl Default for K6Executor {
284 fn default() -> Self {
285 Self::new().expect("k6 not found")
286 }
287}
288
289#[derive(Debug, Clone, Default)]
291pub struct K6Results {
292 pub total_requests: u64,
293 pub failed_requests: u64,
294 pub avg_duration_ms: f64,
295 pub p95_duration_ms: f64,
296 pub p99_duration_ms: f64,
297 pub rps: f64,
298 pub vus_max: u32,
299 pub min_duration_ms: f64,
300 pub max_duration_ms: f64,
301 pub med_duration_ms: f64,
302 pub p90_duration_ms: f64,
303}
304
305impl K6Results {
306 pub fn error_rate(&self) -> f64 {
308 if self.total_requests == 0 {
309 return 0.0;
310 }
311 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
312 }
313
314 pub fn success_rate(&self) -> f64 {
316 100.0 - self.error_rate()
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn test_k6_results_error_rate() {
326 let results = K6Results {
327 total_requests: 100,
328 failed_requests: 5,
329 avg_duration_ms: 100.0,
330 p95_duration_ms: 200.0,
331 p99_duration_ms: 300.0,
332 ..Default::default()
333 };
334
335 assert_eq!(results.error_rate(), 5.0);
336 assert_eq!(results.success_rate(), 95.0);
337 }
338
339 #[test]
340 fn test_k6_results_zero_requests() {
341 let results = K6Results::default();
342 assert_eq!(results.error_rate(), 0.0);
343 }
344
345 #[test]
346 fn test_extract_failure_json_raw() {
347 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
348 let result = extract_failure_json(line).unwrap();
349 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
350 assert_eq!(parsed["check"], "test");
351 }
352
353 #[test]
354 fn test_extract_failure_json_logfmt() {
355 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
356 let result = extract_failure_json(line).unwrap();
357 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
358 assert_eq!(parsed["check"], "test");
359 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
360 }
361
362 #[test]
363 fn test_extract_failure_json_no_marker() {
364 assert!(extract_failure_json("just a regular log line").is_none());
365 }
366}