mockforge_bench/
executor.rs1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_exchange_json(line: &str) -> Option<String> {
13 let marker = "MOCKFORGE_EXCHANGE:";
14 let start = line.find(marker)?;
15 let json_start = start + marker.len();
16 let json_str = &line[json_start..];
17 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
18 if json_str.is_empty() {
19 return None;
20 }
21 if json_str.starts_with('{') && json_str.contains("\\\"") {
22 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
23 } else {
24 Some(json_str.to_string())
25 }
26}
27
28fn extract_failure_json(line: &str) -> Option<String> {
34 let marker = "MOCKFORGE_FAILURE:";
35 let start = line.find(marker)?;
36 let json_start = start + marker.len();
37 let json_str = &line[json_start..];
38 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
40 if json_str.is_empty() {
41 return None;
42 }
43 if json_str.starts_with('{') && json_str.contains("\\\"") {
47 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
48 } else {
49 Some(json_str.to_string())
50 }
51}
52
53pub struct K6Executor {
55 k6_path: String,
56}
57
58impl K6Executor {
59 pub fn new() -> Result<Self> {
61 let k6_path = which::which("k6")
62 .map_err(|_| BenchError::K6NotFound)?
63 .to_string_lossy()
64 .to_string();
65
66 Ok(Self { k6_path })
67 }
68
69 pub fn is_k6_installed() -> bool {
71 which::which("k6").is_ok()
72 }
73
74 pub async fn get_version(&self) -> Result<String> {
76 let output = TokioCommand::new(&self.k6_path)
77 .arg("version")
78 .output()
79 .await
80 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
81
82 if !output.status.success() {
83 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
84 }
85
86 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
87 }
88
89 pub async fn execute(
96 &self,
97 script_path: &Path,
98 output_dir: Option<&Path>,
99 verbose: bool,
100 ) -> Result<K6Results> {
101 self.execute_with_port(script_path, output_dir, verbose, None).await
102 }
103
104 pub async fn execute_with_port(
106 &self,
107 script_path: &Path,
108 output_dir: Option<&Path>,
109 verbose: bool,
110 api_port: Option<u16>,
111 ) -> Result<K6Results> {
112 println!("Starting load test...\n");
113
114 let mut cmd = TokioCommand::new(&self.k6_path);
115 cmd.arg("run");
116
117 if let Some(port) = api_port {
120 cmd.arg("--address").arg(format!("localhost:{}", port));
121 }
122
123 if verbose {
130 cmd.arg("--verbose");
131 }
132
133 let abs_script =
135 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
136 cmd.arg(&abs_script);
137
138 if let Some(dir) = output_dir {
141 cmd.current_dir(dir);
142 }
143
144 cmd.stdout(Stdio::piped());
145 cmd.stderr(Stdio::piped());
146
147 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
148
149 let stdout = child
150 .stdout
151 .take()
152 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
153
154 let stderr = child
155 .stderr
156 .take()
157 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
158
159 let stdout_reader = BufReader::new(stdout);
161 let stderr_reader = BufReader::new(stderr);
162
163 let mut stdout_lines = stdout_reader.lines();
164 let mut stderr_lines = stderr_reader.lines();
165
166 let spinner = ProgressBar::new_spinner();
168 spinner.set_style(
169 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
170 );
171 spinner.set_message("Running load test...");
172
173 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
176 Arc::new(tokio::sync::Mutex::new(Vec::new()));
177 let fd_stdout = Arc::clone(&failure_details);
178 let fd_stderr = Arc::clone(&failure_details);
179
180 let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
182 Arc::new(tokio::sync::Mutex::new(Vec::new()));
183 let ex_stdout = Arc::clone(&exchange_details);
184 let ex_stderr = Arc::clone(&exchange_details);
185
186 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
188 Arc::new(tokio::sync::Mutex::new(Vec::new()));
189 let log_stdout = Arc::clone(&log_lines);
190 let log_stderr = Arc::clone(&log_lines);
191
192 let stdout_handle = tokio::spawn(async move {
194 while let Ok(Some(line)) = stdout_lines.next_line().await {
195 log_stdout.lock().await.push(format!("[stdout] {}", line));
196 if let Some(json_str) = extract_failure_json(&line) {
197 fd_stdout.lock().await.push(json_str);
198 } else if let Some(json_str) = extract_exchange_json(&line) {
199 ex_stdout.lock().await.push(json_str);
200 } else {
201 spinner.set_message(line.clone());
202 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
203 println!("{}", line);
204 }
205 }
206 }
207 spinner.finish_and_clear();
208 });
209
210 let stderr_handle = tokio::spawn(async move {
212 while let Ok(Some(line)) = stderr_lines.next_line().await {
213 if !line.is_empty() {
214 log_stderr.lock().await.push(format!("[stderr] {}", line));
215 if let Some(json_str) = extract_failure_json(&line) {
216 fd_stderr.lock().await.push(json_str);
217 } else if let Some(json_str) = extract_exchange_json(&line) {
218 ex_stderr.lock().await.push(json_str);
219 } else {
220 eprintln!("{}", line);
221 }
222 }
223 }
224 });
225
226 let status =
228 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
229
230 let _ = stdout_handle.await;
232 let _ = stderr_handle.await;
233
234 let exit_code = status.code().unwrap_or(-1);
237 if !status.success() && exit_code != 99 {
238 return Err(BenchError::K6ExecutionFailed(format!(
239 "k6 exited with status: {}",
240 status
241 )));
242 }
243 if exit_code == 99 {
244 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
245 }
246
247 if let Some(dir) = output_dir {
249 let details = failure_details.lock().await;
250 if !details.is_empty() {
251 let failure_path = dir.join("conformance-failure-details.json");
252 let parsed: Vec<serde_json::Value> =
253 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
254 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
255 let _ = std::fs::write(&failure_path, json);
256 }
257 }
258
259 let exchanges = exchange_details.lock().await;
261 if !exchanges.is_empty() {
262 let exchange_path = dir.join("conformance-requests.json");
263 let parsed: Vec<serde_json::Value> =
264 exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
265 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
266 let _ = std::fs::write(&exchange_path, json);
267 tracing::info!(
268 "Exported {} request/response pairs to {}",
269 parsed.len(),
270 exchange_path.display()
271 );
272 }
273 }
274
275 let lines = log_lines.lock().await;
277 if !lines.is_empty() {
278 let log_path = dir.join("k6-output.log");
279 let _ = std::fs::write(&log_path, lines.join("\n"));
280 println!("k6 output log saved to: {}", log_path.display());
281 }
282 }
283
284 let results = if let Some(dir) = output_dir {
286 Self::parse_results(dir)?
287 } else {
288 K6Results::default()
289 };
290
291 Ok(results)
292 }
293
294 fn parse_results(output_dir: &Path) -> Result<K6Results> {
296 let summary_path = output_dir.join("summary.json");
297
298 if !summary_path.exists() {
299 return Ok(K6Results::default());
300 }
301
302 let content = std::fs::read_to_string(summary_path)
303 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
304
305 let json: serde_json::Value = serde_json::from_str(&content)
306 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
307
308 let duration_values = &json["metrics"]["http_req_duration"]["values"];
309
310 let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
311 let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
312 let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
313
314 Ok(K6Results {
315 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
316 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
320 .as_u64()
321 .unwrap_or(0),
322 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
323 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
324 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
325 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
326 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
327 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
328 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
329 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
330 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
331 server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
332 server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
333 server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
334 server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
335 server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
336 server_reported_faults: server_fault.as_u64().unwrap_or(0),
337 })
338 }
339}
340
341impl Default for K6Executor {
342 fn default() -> Self {
343 Self::new().expect("k6 not found")
344 }
345}
346
347#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
349pub struct K6Results {
350 pub total_requests: u64,
351 pub failed_requests: u64,
352 pub avg_duration_ms: f64,
353 pub p95_duration_ms: f64,
354 pub p99_duration_ms: f64,
355 pub rps: f64,
356 pub vus_max: u32,
357 pub min_duration_ms: f64,
358 pub max_duration_ms: f64,
359 pub med_duration_ms: f64,
360 pub p90_duration_ms: f64,
361 #[serde(default)]
366 pub server_injected_latency_samples: u64,
367 #[serde(default)]
368 pub server_injected_latency_avg_ms: f64,
369 #[serde(default)]
370 pub server_injected_latency_max_ms: f64,
371 #[serde(default)]
372 pub server_injected_jitter_samples: u64,
373 #[serde(default)]
374 pub server_injected_jitter_avg_ms: f64,
375 #[serde(default)]
377 pub server_reported_faults: u64,
378}
379
380impl K6Results {
381 pub fn error_rate(&self) -> f64 {
383 if self.total_requests == 0 {
384 return 0.0;
385 }
386 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
387 }
388
389 pub fn success_rate(&self) -> f64 {
391 100.0 - self.error_rate()
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_k6_results_error_rate() {
401 let results = K6Results {
402 total_requests: 100,
403 failed_requests: 5,
404 avg_duration_ms: 100.0,
405 p95_duration_ms: 200.0,
406 p99_duration_ms: 300.0,
407 ..Default::default()
408 };
409
410 assert_eq!(results.error_rate(), 5.0);
411 assert_eq!(results.success_rate(), 95.0);
412 }
413
414 #[test]
415 fn test_k6_results_zero_requests() {
416 let results = K6Results::default();
417 assert_eq!(results.error_rate(), 0.0);
418 }
419
420 #[test]
421 fn test_extract_failure_json_raw() {
422 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
423 let result = extract_failure_json(line).unwrap();
424 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
425 assert_eq!(parsed["check"], "test");
426 }
427
428 #[test]
429 fn test_extract_failure_json_logfmt() {
430 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
431 let result = extract_failure_json(line).unwrap();
432 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
433 assert_eq!(parsed["check"], "test");
434 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
435 }
436
437 #[test]
438 fn test_extract_failure_json_no_marker() {
439 assert!(extract_failure_json("just a regular log line").is_none());
440 }
441}