mockforge_bench/
executor.rs1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_failure_json(line: &str) -> Option<String> {
17 let marker = "MOCKFORGE_FAILURE:";
18 let start = line.find(marker)?;
19 let json_start = start + marker.len();
20 let json_str = &line[json_start..];
21 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
23 if json_str.is_empty() {
24 return None;
25 }
26 if json_str.starts_with('{') && json_str.contains("\\\"") {
30 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
31 } else {
32 Some(json_str.to_string())
33 }
34}
35
36pub struct K6Executor {
38 k6_path: String,
39}
40
41impl K6Executor {
42 pub fn new() -> Result<Self> {
44 let k6_path = which::which("k6")
45 .map_err(|_| BenchError::K6NotFound)?
46 .to_string_lossy()
47 .to_string();
48
49 Ok(Self { k6_path })
50 }
51
52 pub fn is_k6_installed() -> bool {
54 which::which("k6").is_ok()
55 }
56
57 pub async fn get_version(&self) -> Result<String> {
59 let output = TokioCommand::new(&self.k6_path)
60 .arg("version")
61 .output()
62 .await
63 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
64
65 if !output.status.success() {
66 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
67 }
68
69 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
70 }
71
72 pub async fn execute(
74 &self,
75 script_path: &Path,
76 output_dir: Option<&Path>,
77 verbose: bool,
78 ) -> Result<K6Results> {
79 println!("Starting load test...\n");
80
81 let mut cmd = TokioCommand::new(&self.k6_path);
82 cmd.arg("run");
83
84 if let Some(dir) = output_dir {
86 let summary_path = dir.join("summary.json");
87 cmd.arg("--summary-export").arg(summary_path);
88 }
89
90 if verbose {
92 cmd.arg("--verbose");
93 }
94
95 cmd.arg(script_path);
96 cmd.stdout(Stdio::piped());
97 cmd.stderr(Stdio::piped());
98
99 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
100
101 let stdout = child
102 .stdout
103 .take()
104 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
105
106 let stderr = child
107 .stderr
108 .take()
109 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
110
111 let stdout_reader = BufReader::new(stdout);
113 let stderr_reader = BufReader::new(stderr);
114
115 let mut stdout_lines = stdout_reader.lines();
116 let mut stderr_lines = stderr_reader.lines();
117
118 let spinner = ProgressBar::new_spinner();
120 spinner.set_style(
121 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
122 );
123 spinner.set_message("Running load test...");
124
125 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
128 Arc::new(tokio::sync::Mutex::new(Vec::new()));
129 let fd_stdout = Arc::clone(&failure_details);
130 let fd_stderr = Arc::clone(&failure_details);
131
132 let stdout_handle = tokio::spawn(async move {
134 while let Ok(Some(line)) = stdout_lines.next_line().await {
135 if let Some(json_str) = extract_failure_json(&line) {
136 fd_stdout.lock().await.push(json_str);
137 } else {
138 spinner.set_message(line.clone());
139 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
140 println!("{}", line);
141 }
142 }
143 }
144 spinner.finish_and_clear();
145 });
146
147 let stderr_handle = tokio::spawn(async move {
149 while let Ok(Some(line)) = stderr_lines.next_line().await {
150 if !line.is_empty() {
151 if let Some(json_str) = extract_failure_json(&line) {
152 fd_stderr.lock().await.push(json_str);
153 } else {
154 eprintln!("{}", line);
155 }
156 }
157 }
158 });
159
160 let status =
162 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
163
164 let _ = stdout_handle.await;
166 let _ = stderr_handle.await;
167
168 if !status.success() {
169 return Err(BenchError::K6ExecutionFailed(format!(
170 "k6 exited with status: {}",
171 status
172 )));
173 }
174
175 if let Some(dir) = output_dir {
177 let details = failure_details.lock().await;
178 if !details.is_empty() {
179 let failure_path = dir.join("conformance-failure-details.json");
180 let parsed: Vec<serde_json::Value> =
181 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
182 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
183 let _ = std::fs::write(&failure_path, json);
184 }
185 }
186 }
187
188 let results = if let Some(dir) = output_dir {
190 Self::parse_results(dir)?
191 } else {
192 K6Results::default()
193 };
194
195 Ok(results)
196 }
197
198 fn parse_results(output_dir: &Path) -> Result<K6Results> {
200 let summary_path = output_dir.join("summary.json");
201
202 if !summary_path.exists() {
203 return Ok(K6Results::default());
204 }
205
206 let content = std::fs::read_to_string(summary_path)
207 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
208
209 let json: serde_json::Value = serde_json::from_str(&content)
210 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
211
212 let duration_values = &json["metrics"]["http_req_duration"]["values"];
213
214 Ok(K6Results {
215 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
216 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
217 .as_u64()
218 .unwrap_or(0),
219 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
220 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
221 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
222 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
223 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
224 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
225 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
226 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
227 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
228 })
229 }
230}
231
232impl Default for K6Executor {
233 fn default() -> Self {
234 Self::new().expect("k6 not found")
235 }
236}
237
238#[derive(Debug, Clone, Default)]
240pub struct K6Results {
241 pub total_requests: u64,
242 pub failed_requests: u64,
243 pub avg_duration_ms: f64,
244 pub p95_duration_ms: f64,
245 pub p99_duration_ms: f64,
246 pub rps: f64,
247 pub vus_max: u32,
248 pub min_duration_ms: f64,
249 pub max_duration_ms: f64,
250 pub med_duration_ms: f64,
251 pub p90_duration_ms: f64,
252}
253
254impl K6Results {
255 pub fn error_rate(&self) -> f64 {
257 if self.total_requests == 0 {
258 return 0.0;
259 }
260 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
261 }
262
263 pub fn success_rate(&self) -> f64 {
265 100.0 - self.error_rate()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn test_k6_results_error_rate() {
275 let results = K6Results {
276 total_requests: 100,
277 failed_requests: 5,
278 avg_duration_ms: 100.0,
279 p95_duration_ms: 200.0,
280 p99_duration_ms: 300.0,
281 ..Default::default()
282 };
283
284 assert_eq!(results.error_rate(), 5.0);
285 assert_eq!(results.success_rate(), 95.0);
286 }
287
288 #[test]
289 fn test_k6_results_zero_requests() {
290 let results = K6Results::default();
291 assert_eq!(results.error_rate(), 0.0);
292 }
293
294 #[test]
295 fn test_extract_failure_json_raw() {
296 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
297 let result = extract_failure_json(line).unwrap();
298 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
299 assert_eq!(parsed["check"], "test");
300 }
301
302 #[test]
303 fn test_extract_failure_json_logfmt() {
304 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
305 let result = extract_failure_json(line).unwrap();
306 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
307 assert_eq!(parsed["check"], "test");
308 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
309 }
310
311 #[test]
312 fn test_extract_failure_json_no_marker() {
313 assert!(extract_failure_json("just a regular log line").is_none());
314 }
315}