mockforge_bench/
executor.rs1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_failure_json(line: &str) -> Option<String> {
17 let marker = "MOCKFORGE_FAILURE:";
18 let start = line.find(marker)?;
19 let json_start = start + marker.len();
20 let json_str = &line[json_start..];
21 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
23 if json_str.is_empty() {
24 return None;
25 }
26 if json_str.starts_with('{') && json_str.contains("\\\"") {
30 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
31 } else {
32 Some(json_str.to_string())
33 }
34}
35
36pub struct K6Executor {
38 k6_path: String,
39}
40
41impl K6Executor {
42 pub fn new() -> Result<Self> {
44 let k6_path = which::which("k6")
45 .map_err(|_| BenchError::K6NotFound)?
46 .to_string_lossy()
47 .to_string();
48
49 Ok(Self { k6_path })
50 }
51
52 pub fn is_k6_installed() -> bool {
54 which::which("k6").is_ok()
55 }
56
57 pub async fn get_version(&self) -> Result<String> {
59 let output = TokioCommand::new(&self.k6_path)
60 .arg("version")
61 .output()
62 .await
63 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
64
65 if !output.status.success() {
66 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
67 }
68
69 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
70 }
71
72 pub async fn execute(
74 &self,
75 script_path: &Path,
76 output_dir: Option<&Path>,
77 verbose: bool,
78 ) -> Result<K6Results> {
79 println!("Starting load test...\n");
80
81 let mut cmd = TokioCommand::new(&self.k6_path);
82 cmd.arg("run");
83
84 if let Some(dir) = output_dir {
86 let summary_path = dir.join("summary.json");
87 cmd.arg("--summary-export").arg(summary_path);
88 }
89
90 if verbose {
92 cmd.arg("--verbose");
93 }
94
95 cmd.arg(script_path);
96 cmd.stdout(Stdio::piped());
97 cmd.stderr(Stdio::piped());
98
99 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
100
101 let stdout = child
102 .stdout
103 .take()
104 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
105
106 let stderr = child
107 .stderr
108 .take()
109 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
110
111 let stdout_reader = BufReader::new(stdout);
113 let stderr_reader = BufReader::new(stderr);
114
115 let mut stdout_lines = stdout_reader.lines();
116 let mut stderr_lines = stderr_reader.lines();
117
118 let spinner = ProgressBar::new_spinner();
120 spinner.set_style(
121 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
122 );
123 spinner.set_message("Running load test...");
124
125 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
128 Arc::new(tokio::sync::Mutex::new(Vec::new()));
129 let fd_stdout = Arc::clone(&failure_details);
130 let fd_stderr = Arc::clone(&failure_details);
131
132 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
134 Arc::new(tokio::sync::Mutex::new(Vec::new()));
135 let log_stdout = Arc::clone(&log_lines);
136 let log_stderr = Arc::clone(&log_lines);
137
138 let stdout_handle = tokio::spawn(async move {
140 while let Ok(Some(line)) = stdout_lines.next_line().await {
141 log_stdout.lock().await.push(format!("[stdout] {}", line));
142 if let Some(json_str) = extract_failure_json(&line) {
143 fd_stdout.lock().await.push(json_str);
144 } else {
145 spinner.set_message(line.clone());
146 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
147 println!("{}", line);
148 }
149 }
150 }
151 spinner.finish_and_clear();
152 });
153
154 let stderr_handle = tokio::spawn(async move {
156 while let Ok(Some(line)) = stderr_lines.next_line().await {
157 if !line.is_empty() {
158 log_stderr.lock().await.push(format!("[stderr] {}", line));
159 if let Some(json_str) = extract_failure_json(&line) {
160 fd_stderr.lock().await.push(json_str);
161 } else {
162 eprintln!("{}", line);
163 }
164 }
165 }
166 });
167
168 let status =
170 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
171
172 let _ = stdout_handle.await;
174 let _ = stderr_handle.await;
175
176 if !status.success() {
177 return Err(BenchError::K6ExecutionFailed(format!(
178 "k6 exited with status: {}",
179 status
180 )));
181 }
182
183 if let Some(dir) = output_dir {
185 let details = failure_details.lock().await;
186 if !details.is_empty() {
187 let failure_path = dir.join("conformance-failure-details.json");
188 let parsed: Vec<serde_json::Value> =
189 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
190 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
191 let _ = std::fs::write(&failure_path, json);
192 }
193 }
194
195 let lines = log_lines.lock().await;
197 if !lines.is_empty() {
198 let log_path = dir.join("k6-output.log");
199 let _ = std::fs::write(&log_path, lines.join("\n"));
200 println!("k6 output log saved to: {}", log_path.display());
201 }
202 }
203
204 let results = if let Some(dir) = output_dir {
206 Self::parse_results(dir)?
207 } else {
208 K6Results::default()
209 };
210
211 Ok(results)
212 }
213
214 fn parse_results(output_dir: &Path) -> Result<K6Results> {
216 let summary_path = output_dir.join("summary.json");
217
218 if !summary_path.exists() {
219 return Ok(K6Results::default());
220 }
221
222 let content = std::fs::read_to_string(summary_path)
223 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
224
225 let json: serde_json::Value = serde_json::from_str(&content)
226 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
227
228 let duration_values = &json["metrics"]["http_req_duration"]["values"];
229
230 Ok(K6Results {
231 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
232 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
233 .as_u64()
234 .unwrap_or(0),
235 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
236 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
237 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
238 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
239 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
240 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
241 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
242 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
243 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
244 })
245 }
246}
247
248impl Default for K6Executor {
249 fn default() -> Self {
250 Self::new().expect("k6 not found")
251 }
252}
253
254#[derive(Debug, Clone, Default)]
256pub struct K6Results {
257 pub total_requests: u64,
258 pub failed_requests: u64,
259 pub avg_duration_ms: f64,
260 pub p95_duration_ms: f64,
261 pub p99_duration_ms: f64,
262 pub rps: f64,
263 pub vus_max: u32,
264 pub min_duration_ms: f64,
265 pub max_duration_ms: f64,
266 pub med_duration_ms: f64,
267 pub p90_duration_ms: f64,
268}
269
270impl K6Results {
271 pub fn error_rate(&self) -> f64 {
273 if self.total_requests == 0 {
274 return 0.0;
275 }
276 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
277 }
278
279 pub fn success_rate(&self) -> f64 {
281 100.0 - self.error_rate()
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn test_k6_results_error_rate() {
291 let results = K6Results {
292 total_requests: 100,
293 failed_requests: 5,
294 avg_duration_ms: 100.0,
295 p95_duration_ms: 200.0,
296 p99_duration_ms: 300.0,
297 ..Default::default()
298 };
299
300 assert_eq!(results.error_rate(), 5.0);
301 assert_eq!(results.success_rate(), 95.0);
302 }
303
304 #[test]
305 fn test_k6_results_zero_requests() {
306 let results = K6Results::default();
307 assert_eq!(results.error_rate(), 0.0);
308 }
309
310 #[test]
311 fn test_extract_failure_json_raw() {
312 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
313 let result = extract_failure_json(line).unwrap();
314 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
315 assert_eq!(parsed["check"], "test");
316 }
317
318 #[test]
319 fn test_extract_failure_json_logfmt() {
320 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
321 let result = extract_failure_json(line).unwrap();
322 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
323 assert_eq!(parsed["check"], "test");
324 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
325 }
326
327 #[test]
328 fn test_extract_failure_json_no_marker() {
329 assert!(extract_failure_json("just a regular log line").is_none());
330 }
331}