1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_mockforge_marker_json(line: &str, marker: &str) -> Option<String> {
34 let start = line.find(marker)?;
35 let json_start = start + marker.len();
36 let rest = &line[json_start..];
37
38 let is_logfmt = start >= 5 && line.as_bytes().get(start - 5..start) == Some(b"msg=\"");
42 if is_logfmt {
43 let bytes = rest.as_bytes();
48 let mut i = 0;
49 let mut out = String::with_capacity(rest.len());
50 while i < bytes.len() {
51 let b = bytes[i];
52 if b == b'"' {
53 return Some(out);
55 }
56 if b == b'\\' && i + 1 < bytes.len() {
57 let next = bytes[i + 1];
58 match next {
59 b'"' => out.push('"'),
60 b'\\' => out.push('\\'),
61 other => {
65 out.push('\\');
66 out.push(other as char);
67 }
68 }
69 i += 2;
70 continue;
71 }
72 let ch_start = i;
75 i += 1;
77 while i < bytes.len() && (bytes[i] & 0b1100_0000) == 0b1000_0000 {
78 i += 1;
79 }
80 out.push_str(&rest[ch_start..i]);
81 }
82 if out.is_empty() {
85 None
86 } else {
87 Some(out)
88 }
89 } else {
90 let trimmed = rest.trim();
93 if trimmed.is_empty() {
94 None
95 } else {
96 Some(trimmed.to_string())
97 }
98 }
99}
100
101fn extract_exchange_json(line: &str) -> Option<String> {
103 extract_mockforge_marker_json(line, "MOCKFORGE_EXCHANGE:")
104}
105
106fn extract_failure_json(line: &str) -> Option<String> {
108 extract_mockforge_marker_json(line, "MOCKFORGE_FAILURE:")
109}
110
111pub struct K6Executor {
113 k6_path: String,
114 local_ips: String,
120}
121
122impl K6Executor {
123 pub fn new() -> Result<Self> {
125 let k6_path = which::which("k6")
126 .map_err(|_| BenchError::K6NotFound)?
127 .to_string_lossy()
128 .to_string();
129
130 Ok(Self {
131 k6_path,
132 local_ips: String::new(),
133 })
134 }
135
136 pub fn with_local_ips(mut self, local_ips: impl Into<String>) -> Self {
140 self.local_ips = local_ips.into();
141 self
142 }
143
144 pub fn is_k6_installed() -> bool {
146 which::which("k6").is_ok()
147 }
148
149 pub async fn get_version(&self) -> Result<String> {
151 let output = TokioCommand::new(&self.k6_path)
152 .arg("version")
153 .output()
154 .await
155 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
156
157 if !output.status.success() {
158 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
159 }
160
161 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
162 }
163
164 pub async fn execute(
171 &self,
172 script_path: &Path,
173 output_dir: Option<&Path>,
174 verbose: bool,
175 ) -> Result<K6Results> {
176 self.execute_with_port(script_path, output_dir, verbose, None).await
177 }
178
179 pub async fn execute_with_port(
181 &self,
182 script_path: &Path,
183 output_dir: Option<&Path>,
184 verbose: bool,
185 api_port: Option<u16>,
186 ) -> Result<K6Results> {
187 println!("Starting load test...\n");
188
189 let mut cmd = TokioCommand::new(&self.k6_path);
190 cmd.arg("run");
191
192 if let Some(port) = api_port {
195 cmd.arg("--address").arg(format!("localhost:{}", port));
196 }
197
198 if !self.local_ips.is_empty() {
203 cmd.arg("--local-ips").arg(&self.local_ips);
204 }
205
206 if verbose {
213 cmd.arg("--verbose");
214 }
215
216 let abs_script =
218 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
219 cmd.arg(&abs_script);
220
221 if let Some(dir) = output_dir {
224 cmd.current_dir(dir);
225 }
226
227 cmd.stdout(Stdio::piped());
228 cmd.stderr(Stdio::piped());
229
230 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
231
232 let stdout = child
233 .stdout
234 .take()
235 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
236
237 let stderr = child
238 .stderr
239 .take()
240 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
241
242 let stdout_reader = BufReader::new(stdout);
244 let stderr_reader = BufReader::new(stderr);
245
246 let mut stdout_lines = stdout_reader.lines();
247 let mut stderr_lines = stderr_reader.lines();
248
249 let spinner = ProgressBar::new_spinner();
251 spinner.set_style(
252 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
253 );
254 spinner.set_message("Running load test...");
255
256 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
259 Arc::new(tokio::sync::Mutex::new(Vec::new()));
260 let fd_stdout = Arc::clone(&failure_details);
261 let fd_stderr = Arc::clone(&failure_details);
262
263 let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
265 Arc::new(tokio::sync::Mutex::new(Vec::new()));
266 let ex_stdout = Arc::clone(&exchange_details);
267 let ex_stderr = Arc::clone(&exchange_details);
268
269 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
271 Arc::new(tokio::sync::Mutex::new(Vec::new()));
272 let log_stdout = Arc::clone(&log_lines);
273 let log_stderr = Arc::clone(&log_lines);
274
275 let stdout_handle = tokio::spawn(async move {
277 while let Ok(Some(line)) = stdout_lines.next_line().await {
278 log_stdout.lock().await.push(format!("[stdout] {}", line));
279 if let Some(json_str) = extract_failure_json(&line) {
280 fd_stdout.lock().await.push(json_str);
281 } else if let Some(json_str) = extract_exchange_json(&line) {
282 ex_stdout.lock().await.push(json_str);
283 } else {
284 spinner.set_message(line.clone());
285 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
286 println!("{}", line);
287 }
288 }
289 }
290 spinner.finish_and_clear();
291 });
292
293 let stderr_handle = tokio::spawn(async move {
295 while let Ok(Some(line)) = stderr_lines.next_line().await {
296 if !line.is_empty() {
297 log_stderr.lock().await.push(format!("[stderr] {}", line));
298 if let Some(json_str) = extract_failure_json(&line) {
299 fd_stderr.lock().await.push(json_str);
300 } else if let Some(json_str) = extract_exchange_json(&line) {
301 ex_stderr.lock().await.push(json_str);
302 } else {
303 eprintln!("{}", line);
304 }
305 }
306 }
307 });
308
309 let status =
311 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
312
313 let _ = stdout_handle.await;
315 let _ = stderr_handle.await;
316
317 let exit_code = status.code().unwrap_or(-1);
320 if !status.success() && exit_code != 99 {
321 return Err(BenchError::K6ExecutionFailed(format!(
322 "k6 exited with status: {}",
323 status
324 )));
325 }
326 if exit_code == 99 {
327 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
328 }
329
330 if let Some(dir) = output_dir {
332 let details = failure_details.lock().await;
333 if !details.is_empty() {
334 let failure_path = dir.join("conformance-failure-details.json");
335 let parsed: Vec<serde_json::Value> =
336 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
337 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
338 let _ = std::fs::write(&failure_path, json);
339 }
340 }
341
342 let exchanges = exchange_details.lock().await;
344 if !exchanges.is_empty() {
345 let exchange_path = dir.join("conformance-requests.json");
346 let parsed: Vec<serde_json::Value> =
347 exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
348 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
349 let _ = std::fs::write(&exchange_path, json);
350 tracing::info!(
351 "Exported {} request/response pairs to {}",
352 parsed.len(),
353 exchange_path.display()
354 );
355 }
356 }
357
358 let lines = log_lines.lock().await;
360 if !lines.is_empty() {
361 let log_path = dir.join("k6-output.log");
362 let _ = std::fs::write(&log_path, lines.join("\n"));
363 println!("k6 output log saved to: {}", log_path.display());
364 }
365 }
366
367 let results = if let Some(dir) = output_dir {
369 Self::parse_results(dir)?
370 } else {
371 K6Results::default()
372 };
373
374 Ok(results)
375 }
376
377 fn parse_results(output_dir: &Path) -> Result<K6Results> {
379 let summary_path = output_dir.join("summary.json");
380
381 if !summary_path.exists() {
382 return Ok(K6Results::default());
383 }
384
385 let content = std::fs::read_to_string(summary_path)
386 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
387
388 let json: serde_json::Value = serde_json::from_str(&content)
389 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
390
391 let duration_values = &json["metrics"]["http_req_duration"]["values"];
392
393 let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
394 let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
395 let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
396
397 let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
408 let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
409 let mf_conns_opened = &json["metrics"]["mockforge_connections_opened"]["values"]["count"];
410
411 Ok(K6Results {
412 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
413 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
417 .as_u64()
418 .unwrap_or(0),
419 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
420 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
421 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
422 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
423 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
424 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
425 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
426 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
427 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
428 server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
429 server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
430 server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
431 server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
432 server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
433 server_reported_faults: server_fault.as_u64().unwrap_or(0),
434 tcp_connect_samples: mf_conns_opened.as_u64().unwrap_or(0),
437 tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
438 tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
439 tls_handshake_samples: if tls_handshake["avg"].as_f64().unwrap_or(0.0) > 0.0 {
441 mf_conns_opened.as_u64().unwrap_or(0)
444 } else {
445 0
446 },
447 tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
448 tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
449 iterations_completed: json["metrics"]["iterations"]["values"]["count"]
450 .as_u64()
451 .unwrap_or(0),
452 })
453 }
454}
455
456impl Default for K6Executor {
457 fn default() -> Self {
458 Self::new().expect("k6 not found")
459 }
460}
461
462#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
464pub struct K6Results {
465 pub total_requests: u64,
466 pub failed_requests: u64,
467 pub avg_duration_ms: f64,
468 pub p95_duration_ms: f64,
469 pub p99_duration_ms: f64,
470 pub rps: f64,
471 pub vus_max: u32,
472 pub min_duration_ms: f64,
473 pub max_duration_ms: f64,
474 pub med_duration_ms: f64,
475 pub p90_duration_ms: f64,
476 #[serde(default)]
481 pub server_injected_latency_samples: u64,
482 #[serde(default)]
483 pub server_injected_latency_avg_ms: f64,
484 #[serde(default)]
485 pub server_injected_latency_max_ms: f64,
486 #[serde(default)]
487 pub server_injected_jitter_samples: u64,
488 #[serde(default)]
489 pub server_injected_jitter_avg_ms: f64,
490 #[serde(default)]
492 pub server_reported_faults: u64,
493 #[serde(default)]
499 pub tcp_connect_samples: u64,
500 #[serde(default)]
501 pub tcp_connect_avg_ms: f64,
502 #[serde(default)]
503 pub tcp_connect_max_ms: f64,
504 #[serde(default)]
507 pub tls_handshake_samples: u64,
508 #[serde(default)]
509 pub tls_handshake_avg_ms: f64,
510 #[serde(default)]
511 pub tls_handshake_max_ms: f64,
512 #[serde(default)]
518 pub iterations_completed: u64,
519}
520
521impl K6Results {
522 pub fn error_rate(&self) -> f64 {
524 if self.total_requests == 0 {
525 return 0.0;
526 }
527 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
528 }
529
530 pub fn success_rate(&self) -> f64 {
532 100.0 - self.error_rate()
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[test]
541 fn test_k6_results_error_rate() {
542 let results = K6Results {
543 total_requests: 100,
544 failed_requests: 5,
545 avg_duration_ms: 100.0,
546 p95_duration_ms: 200.0,
547 p99_duration_ms: 300.0,
548 ..Default::default()
549 };
550
551 assert_eq!(results.error_rate(), 5.0);
552 assert_eq!(results.success_rate(), 95.0);
553 }
554
555 #[test]
556 fn test_k6_results_zero_requests() {
557 let results = K6Results::default();
558 assert_eq!(results.error_rate(), 0.0);
559 }
560
561 #[test]
562 fn test_extract_failure_json_raw() {
563 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
564 let result = extract_failure_json(line).unwrap();
565 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
566 assert_eq!(parsed["check"], "test");
567 }
568
569 #[test]
570 fn test_extract_failure_json_logfmt() {
571 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
572 let result = extract_failure_json(line).unwrap();
573 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
574 assert_eq!(parsed["check"], "test");
575 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
576 }
577
578 #[test]
579 fn test_extract_failure_json_no_marker() {
580 assert!(extract_failure_json("just a regular log line").is_none());
581 }
582
583 #[test]
589 fn test_extract_exchange_logfmt_with_backslash_escapes() {
590 let line = r#"time="2026-06-26T10:00:00Z" level=info msg="MOCKFORGE_EXCHANGE:{\"check\":\"u\",\"request\":{\"body\":\"--bnd\\r\\n\\u001a\"}}" source=console"#;
594 let result = extract_exchange_json(line).unwrap();
595 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
596 assert_eq!(parsed["check"], "u");
597 assert_eq!(parsed["request"]["body"], "--bnd\r\n\u{001a}");
600 }
601
602 #[test]
603 fn test_extract_exchange_raw_no_logfmt_wrapping() {
604 let line =
605 r#"MOCKFORGE_EXCHANGE:{"check":"x","request":{"body":""},"response":{"status":200}}"#;
606 let result = extract_exchange_json(line).unwrap();
607 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
608 assert_eq!(parsed["check"], "x");
609 assert_eq!(parsed["response"]["status"], 200);
610 }
611
612 #[test]
616 fn test_extract_exchange_logfmt_tolerates_extra_trailing_fields() {
617 let line = r#"msg="MOCKFORGE_EXCHANGE:{\"check\":\"t\"}" source=console vu=1 iter=0"#;
618 let result = extract_exchange_json(line).unwrap();
619 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
620 assert_eq!(parsed["check"], "t");
621 }
622
623 #[test]
627 fn test_extract_exchange_double_backslash_followed_by_quote() {
628 let line = r#"msg="MOCKFORGE_EXCHANGE:{\"k\":\"a\\\\\\\"x\\\"\"}" source=console"#;
631 let result = extract_exchange_json(line).unwrap();
632 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
633 assert_eq!(parsed["k"], r#"a\"x""#);
634 }
635}