1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_exchange_json(line: &str) -> Option<String> {
13 let marker = "MOCKFORGE_EXCHANGE:";
14 let start = line.find(marker)?;
15 let json_start = start + marker.len();
16 let json_str = &line[json_start..];
17 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
18 if json_str.is_empty() {
19 return None;
20 }
21 if json_str.starts_with('{') && json_str.contains("\\\"") {
22 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
23 } else {
24 Some(json_str.to_string())
25 }
26}
27
28fn extract_failure_json(line: &str) -> Option<String> {
34 let marker = "MOCKFORGE_FAILURE:";
35 let start = line.find(marker)?;
36 let json_start = start + marker.len();
37 let json_str = &line[json_start..];
38 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
40 if json_str.is_empty() {
41 return None;
42 }
43 if json_str.starts_with('{') && json_str.contains("\\\"") {
47 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
48 } else {
49 Some(json_str.to_string())
50 }
51}
52
53pub struct K6Executor {
55 k6_path: String,
56}
57
58impl K6Executor {
59 pub fn new() -> Result<Self> {
61 let k6_path = which::which("k6")
62 .map_err(|_| BenchError::K6NotFound)?
63 .to_string_lossy()
64 .to_string();
65
66 Ok(Self { k6_path })
67 }
68
69 pub fn is_k6_installed() -> bool {
71 which::which("k6").is_ok()
72 }
73
74 pub async fn get_version(&self) -> Result<String> {
76 let output = TokioCommand::new(&self.k6_path)
77 .arg("version")
78 .output()
79 .await
80 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
81
82 if !output.status.success() {
83 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
84 }
85
86 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
87 }
88
89 pub async fn execute(
96 &self,
97 script_path: &Path,
98 output_dir: Option<&Path>,
99 verbose: bool,
100 ) -> Result<K6Results> {
101 self.execute_with_port(script_path, output_dir, verbose, None).await
102 }
103
104 pub async fn execute_with_port(
106 &self,
107 script_path: &Path,
108 output_dir: Option<&Path>,
109 verbose: bool,
110 api_port: Option<u16>,
111 ) -> Result<K6Results> {
112 println!("Starting load test...\n");
113
114 let mut cmd = TokioCommand::new(&self.k6_path);
115 cmd.arg("run");
116
117 if let Some(port) = api_port {
120 cmd.arg("--address").arg(format!("localhost:{}", port));
121 }
122
123 if verbose {
130 cmd.arg("--verbose");
131 }
132
133 let abs_script =
135 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
136 cmd.arg(&abs_script);
137
138 if let Some(dir) = output_dir {
141 cmd.current_dir(dir);
142 }
143
144 cmd.stdout(Stdio::piped());
145 cmd.stderr(Stdio::piped());
146
147 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
148
149 let stdout = child
150 .stdout
151 .take()
152 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
153
154 let stderr = child
155 .stderr
156 .take()
157 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
158
159 let stdout_reader = BufReader::new(stdout);
161 let stderr_reader = BufReader::new(stderr);
162
163 let mut stdout_lines = stdout_reader.lines();
164 let mut stderr_lines = stderr_reader.lines();
165
166 let spinner = ProgressBar::new_spinner();
168 spinner.set_style(
169 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
170 );
171 spinner.set_message("Running load test...");
172
173 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
176 Arc::new(tokio::sync::Mutex::new(Vec::new()));
177 let fd_stdout = Arc::clone(&failure_details);
178 let fd_stderr = Arc::clone(&failure_details);
179
180 let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
182 Arc::new(tokio::sync::Mutex::new(Vec::new()));
183 let ex_stdout = Arc::clone(&exchange_details);
184 let ex_stderr = Arc::clone(&exchange_details);
185
186 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
188 Arc::new(tokio::sync::Mutex::new(Vec::new()));
189 let log_stdout = Arc::clone(&log_lines);
190 let log_stderr = Arc::clone(&log_lines);
191
192 let stdout_handle = tokio::spawn(async move {
194 while let Ok(Some(line)) = stdout_lines.next_line().await {
195 log_stdout.lock().await.push(format!("[stdout] {}", line));
196 if let Some(json_str) = extract_failure_json(&line) {
197 fd_stdout.lock().await.push(json_str);
198 } else if let Some(json_str) = extract_exchange_json(&line) {
199 ex_stdout.lock().await.push(json_str);
200 } else {
201 spinner.set_message(line.clone());
202 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
203 println!("{}", line);
204 }
205 }
206 }
207 spinner.finish_and_clear();
208 });
209
210 let stderr_handle = tokio::spawn(async move {
212 while let Ok(Some(line)) = stderr_lines.next_line().await {
213 if !line.is_empty() {
214 log_stderr.lock().await.push(format!("[stderr] {}", line));
215 if let Some(json_str) = extract_failure_json(&line) {
216 fd_stderr.lock().await.push(json_str);
217 } else if let Some(json_str) = extract_exchange_json(&line) {
218 ex_stderr.lock().await.push(json_str);
219 } else {
220 eprintln!("{}", line);
221 }
222 }
223 }
224 });
225
226 let status =
228 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
229
230 let _ = stdout_handle.await;
232 let _ = stderr_handle.await;
233
234 let exit_code = status.code().unwrap_or(-1);
237 if !status.success() && exit_code != 99 {
238 return Err(BenchError::K6ExecutionFailed(format!(
239 "k6 exited with status: {}",
240 status
241 )));
242 }
243 if exit_code == 99 {
244 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
245 }
246
247 if let Some(dir) = output_dir {
249 let details = failure_details.lock().await;
250 if !details.is_empty() {
251 let failure_path = dir.join("conformance-failure-details.json");
252 let parsed: Vec<serde_json::Value> =
253 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
254 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
255 let _ = std::fs::write(&failure_path, json);
256 }
257 }
258
259 let exchanges = exchange_details.lock().await;
261 if !exchanges.is_empty() {
262 let exchange_path = dir.join("conformance-requests.json");
263 let parsed: Vec<serde_json::Value> =
264 exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
265 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
266 let _ = std::fs::write(&exchange_path, json);
267 tracing::info!(
268 "Exported {} request/response pairs to {}",
269 parsed.len(),
270 exchange_path.display()
271 );
272 }
273 }
274
275 let lines = log_lines.lock().await;
277 if !lines.is_empty() {
278 let log_path = dir.join("k6-output.log");
279 let _ = std::fs::write(&log_path, lines.join("\n"));
280 println!("k6 output log saved to: {}", log_path.display());
281 }
282 }
283
284 let results = if let Some(dir) = output_dir {
286 Self::parse_results(dir)?
287 } else {
288 K6Results::default()
289 };
290
291 Ok(results)
292 }
293
294 fn parse_results(output_dir: &Path) -> Result<K6Results> {
296 let summary_path = output_dir.join("summary.json");
297
298 if !summary_path.exists() {
299 return Ok(K6Results::default());
300 }
301
302 let content = std::fs::read_to_string(summary_path)
303 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
304
305 let json: serde_json::Value = serde_json::from_str(&content)
306 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
307
308 let duration_values = &json["metrics"]["http_req_duration"]["values"];
309
310 let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
311 let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
312 let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
313
314 let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
319 let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
320
321 Ok(K6Results {
322 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
323 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
327 .as_u64()
328 .unwrap_or(0),
329 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
330 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
331 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
332 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
333 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
334 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
335 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
336 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
337 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
338 server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
339 server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
340 server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
341 server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
342 server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
343 server_reported_faults: server_fault.as_u64().unwrap_or(0),
344 tcp_connect_samples: tcp_connecting["count"].as_u64().unwrap_or(0),
345 tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
346 tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
347 tls_handshake_samples: tls_handshake["count"].as_u64().unwrap_or(0),
348 tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
349 tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
350 })
351 }
352}
353
354impl Default for K6Executor {
355 fn default() -> Self {
356 Self::new().expect("k6 not found")
357 }
358}
359
360#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
362pub struct K6Results {
363 pub total_requests: u64,
364 pub failed_requests: u64,
365 pub avg_duration_ms: f64,
366 pub p95_duration_ms: f64,
367 pub p99_duration_ms: f64,
368 pub rps: f64,
369 pub vus_max: u32,
370 pub min_duration_ms: f64,
371 pub max_duration_ms: f64,
372 pub med_duration_ms: f64,
373 pub p90_duration_ms: f64,
374 #[serde(default)]
379 pub server_injected_latency_samples: u64,
380 #[serde(default)]
381 pub server_injected_latency_avg_ms: f64,
382 #[serde(default)]
383 pub server_injected_latency_max_ms: f64,
384 #[serde(default)]
385 pub server_injected_jitter_samples: u64,
386 #[serde(default)]
387 pub server_injected_jitter_avg_ms: f64,
388 #[serde(default)]
390 pub server_reported_faults: u64,
391 #[serde(default)]
397 pub tcp_connect_samples: u64,
398 #[serde(default)]
399 pub tcp_connect_avg_ms: f64,
400 #[serde(default)]
401 pub tcp_connect_max_ms: f64,
402 #[serde(default)]
405 pub tls_handshake_samples: u64,
406 #[serde(default)]
407 pub tls_handshake_avg_ms: f64,
408 #[serde(default)]
409 pub tls_handshake_max_ms: f64,
410}
411
412impl K6Results {
413 pub fn error_rate(&self) -> f64 {
415 if self.total_requests == 0 {
416 return 0.0;
417 }
418 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
419 }
420
421 pub fn success_rate(&self) -> f64 {
423 100.0 - self.error_rate()
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn test_k6_results_error_rate() {
433 let results = K6Results {
434 total_requests: 100,
435 failed_requests: 5,
436 avg_duration_ms: 100.0,
437 p95_duration_ms: 200.0,
438 p99_duration_ms: 300.0,
439 ..Default::default()
440 };
441
442 assert_eq!(results.error_rate(), 5.0);
443 assert_eq!(results.success_rate(), 95.0);
444 }
445
446 #[test]
447 fn test_k6_results_zero_requests() {
448 let results = K6Results::default();
449 assert_eq!(results.error_rate(), 0.0);
450 }
451
452 #[test]
453 fn test_extract_failure_json_raw() {
454 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
455 let result = extract_failure_json(line).unwrap();
456 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
457 assert_eq!(parsed["check"], "test");
458 }
459
460 #[test]
461 fn test_extract_failure_json_logfmt() {
462 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
463 let result = extract_failure_json(line).unwrap();
464 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
465 assert_eq!(parsed["check"], "test");
466 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
467 }
468
469 #[test]
470 fn test_extract_failure_json_no_marker() {
471 assert!(extract_failure_json("just a regular log line").is_none());
472 }
473}