1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_mockforge_marker_json(line: &str, marker: &str) -> Option<String> {
34 let start = line.find(marker)?;
35 let json_start = start + marker.len();
36 let rest = &line[json_start..];
37
38 let is_logfmt = start >= 5 && line.as_bytes().get(start - 5..start) == Some(b"msg=\"");
42 if is_logfmt {
43 let bytes = rest.as_bytes();
48 let mut i = 0;
49 let mut out = String::with_capacity(rest.len());
50 while i < bytes.len() {
51 let b = bytes[i];
52 if b == b'"' {
53 return Some(out);
55 }
56 if b == b'\\' && i + 1 < bytes.len() {
57 let next = bytes[i + 1];
58 match next {
59 b'"' => out.push('"'),
60 b'\\' => out.push('\\'),
61 other => {
65 out.push('\\');
66 out.push(other as char);
67 }
68 }
69 i += 2;
70 continue;
71 }
72 let ch_start = i;
75 i += 1;
77 while i < bytes.len() && (bytes[i] & 0b1100_0000) == 0b1000_0000 {
78 i += 1;
79 }
80 out.push_str(&rest[ch_start..i]);
81 }
82 if out.is_empty() {
85 None
86 } else {
87 Some(out)
88 }
89 } else {
90 let trimmed = rest.trim();
93 if trimmed.is_empty() {
94 None
95 } else {
96 Some(trimmed.to_string())
97 }
98 }
99}
100
101fn extract_exchange_json(line: &str) -> Option<String> {
103 extract_mockforge_marker_json(line, "MOCKFORGE_EXCHANGE:")
104}
105
106fn extract_failure_json(line: &str) -> Option<String> {
108 extract_mockforge_marker_json(line, "MOCKFORGE_FAILURE:")
109}
110
111fn extract_network_event_json(line: &str) -> Option<String> {
115 extract_mockforge_marker_json(line, "MOCKFORGE_NETWORK_EVENT:")
116}
117
118pub struct K6Executor {
120 k6_path: String,
121 local_ips: String,
127}
128
129impl K6Executor {
130 pub fn new() -> Result<Self> {
132 let k6_path = which::which("k6")
133 .map_err(|_| BenchError::K6NotFound)?
134 .to_string_lossy()
135 .to_string();
136
137 Ok(Self {
138 k6_path,
139 local_ips: String::new(),
140 })
141 }
142
143 pub fn with_local_ips(mut self, local_ips: impl Into<String>) -> Self {
147 self.local_ips = local_ips.into();
148 self
149 }
150
151 pub fn is_k6_installed() -> bool {
153 which::which("k6").is_ok()
154 }
155
156 pub async fn get_version(&self) -> Result<String> {
158 let output = TokioCommand::new(&self.k6_path)
159 .arg("version")
160 .output()
161 .await
162 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
163
164 if !output.status.success() {
165 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
166 }
167
168 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
169 }
170
171 pub async fn execute(
178 &self,
179 script_path: &Path,
180 output_dir: Option<&Path>,
181 verbose: bool,
182 ) -> Result<K6Results> {
183 self.execute_with_port(script_path, output_dir, verbose, None).await
184 }
185
186 pub async fn execute_with_port(
188 &self,
189 script_path: &Path,
190 output_dir: Option<&Path>,
191 verbose: bool,
192 api_port: Option<u16>,
193 ) -> Result<K6Results> {
194 println!("Starting load test...\n");
195
196 let mut cmd = TokioCommand::new(&self.k6_path);
197 cmd.arg("run");
198
199 if let Some(port) = api_port {
202 cmd.arg("--address").arg(format!("localhost:{}", port));
203 }
204
205 if !self.local_ips.is_empty() {
210 cmd.arg("--local-ips").arg(&self.local_ips);
211 }
212
213 if verbose {
220 cmd.arg("--verbose");
221 }
222
223 let abs_script =
225 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
226 cmd.arg(&abs_script);
227
228 if let Some(dir) = output_dir {
231 cmd.current_dir(dir);
232 }
233
234 cmd.stdout(Stdio::piped());
235 cmd.stderr(Stdio::piped());
236
237 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
238
239 let stdout = child
240 .stdout
241 .take()
242 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
243
244 let stderr = child
245 .stderr
246 .take()
247 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
248
249 let stdout_reader = BufReader::new(stdout);
251 let stderr_reader = BufReader::new(stderr);
252
253 let mut stdout_lines = stdout_reader.lines();
254 let mut stderr_lines = stderr_reader.lines();
255
256 let spinner = ProgressBar::new_spinner();
258 spinner.set_style(
259 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
260 );
261 spinner.set_message("Running load test...");
262
263 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
266 Arc::new(tokio::sync::Mutex::new(Vec::new()));
267 let fd_stdout = Arc::clone(&failure_details);
268 let fd_stderr = Arc::clone(&failure_details);
269
270 let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
272 Arc::new(tokio::sync::Mutex::new(Vec::new()));
273 let ex_stdout = Arc::clone(&exchange_details);
274 let ex_stderr = Arc::clone(&exchange_details);
275
276 let network_events: Arc<tokio::sync::Mutex<Vec<String>>> =
281 Arc::new(tokio::sync::Mutex::new(Vec::new()));
282 let ne_stdout = Arc::clone(&network_events);
283 let ne_stderr = Arc::clone(&network_events);
284
285 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
287 Arc::new(tokio::sync::Mutex::new(Vec::new()));
288 let log_stdout = Arc::clone(&log_lines);
289 let log_stderr = Arc::clone(&log_lines);
290
291 let stdout_handle = tokio::spawn(async move {
293 while let Ok(Some(line)) = stdout_lines.next_line().await {
294 log_stdout.lock().await.push(format!("[stdout] {}", line));
295 if let Some(json_str) = extract_failure_json(&line) {
296 fd_stdout.lock().await.push(json_str);
297 } else if let Some(json_str) = extract_exchange_json(&line) {
298 ex_stdout.lock().await.push(json_str);
299 } else if let Some(json_str) = extract_network_event_json(&line) {
300 ne_stdout.lock().await.push(json_str);
301 } else {
302 spinner.set_message(line.clone());
303 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
304 println!("{}", line);
305 }
306 }
307 }
308 spinner.finish_and_clear();
309 });
310
311 let stderr_handle = tokio::spawn(async move {
313 while let Ok(Some(line)) = stderr_lines.next_line().await {
314 if !line.is_empty() {
315 log_stderr.lock().await.push(format!("[stderr] {}", line));
316 if let Some(json_str) = extract_failure_json(&line) {
317 fd_stderr.lock().await.push(json_str);
318 } else if let Some(json_str) = extract_exchange_json(&line) {
319 ex_stderr.lock().await.push(json_str);
320 } else if let Some(json_str) = extract_network_event_json(&line) {
321 ne_stderr.lock().await.push(json_str);
322 } else {
323 eprintln!("{}", line);
324 }
325 }
326 }
327 });
328
329 let status =
331 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
332
333 let _ = stdout_handle.await;
335 let _ = stderr_handle.await;
336
337 let exit_code = status.code().unwrap_or(-1);
340 if !status.success() && exit_code != 99 {
341 return Err(BenchError::K6ExecutionFailed(format!(
342 "k6 exited with status: {}",
343 status
344 )));
345 }
346 if exit_code == 99 {
347 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
348 }
349
350 if let Some(dir) = output_dir {
352 let details = failure_details.lock().await;
353 if !details.is_empty() {
354 let failure_path = dir.join("conformance-failure-details.json");
355 let parsed: Vec<serde_json::Value> =
356 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
357 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
358 let _ = std::fs::write(&failure_path, json);
359 }
360 }
361
362 let exchanges = exchange_details.lock().await;
364 if !exchanges.is_empty() {
365 let exchange_path = dir.join("conformance-requests.json");
366 let parsed: Vec<serde_json::Value> =
367 exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
368 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
369 let _ = std::fs::write(&exchange_path, json);
370 tracing::info!(
371 "Exported {} request/response pairs to {}",
372 parsed.len(),
373 exchange_path.display()
374 );
375 }
376 }
377
378 let net_events = network_events.lock().await;
383 let net_path = dir.join("conformance-network-events.json");
384 let parsed: Vec<serde_json::Value> =
385 net_events.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
386 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
387 let _ = std::fs::write(&net_path, json);
388 if !parsed.is_empty() {
389 tracing::warn!(
390 "Recorded {} wire-level network event(s) to {}",
391 parsed.len(),
392 net_path.display()
393 );
394 }
395 }
396
397 let lines = log_lines.lock().await;
399 if !lines.is_empty() {
400 let log_path = dir.join("k6-output.log");
401 let _ = std::fs::write(&log_path, lines.join("\n"));
402 println!("k6 output log saved to: {}", log_path.display());
403 }
404 }
405
406 let results = if let Some(dir) = output_dir {
408 Self::parse_results(dir)?
409 } else {
410 K6Results::default()
411 };
412
413 Ok(results)
414 }
415
416 fn parse_results(output_dir: &Path) -> Result<K6Results> {
418 let summary_path = output_dir.join("summary.json");
419
420 if !summary_path.exists() {
421 return Ok(K6Results::default());
422 }
423
424 let content = std::fs::read_to_string(summary_path)
425 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
426
427 let json: serde_json::Value = serde_json::from_str(&content)
428 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
429
430 let duration_values = &json["metrics"]["http_req_duration"]["values"];
431
432 let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
433 let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
434 let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
435
436 let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
447 let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
448 let mf_conns_opened = &json["metrics"]["mockforge_connections_opened"]["values"]["count"];
449
450 Ok(K6Results {
451 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
452 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
456 .as_u64()
457 .unwrap_or(0),
458 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
459 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
460 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
461 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
462 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
463 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
464 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
465 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
466 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
467 server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
468 server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
469 server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
470 server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
471 server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
472 server_reported_faults: server_fault.as_u64().unwrap_or(0),
473 tcp_connect_samples: mf_conns_opened.as_u64().unwrap_or(0),
476 tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
477 tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
478 tls_handshake_samples: if tls_handshake["avg"].as_f64().unwrap_or(0.0) > 0.0 {
480 mf_conns_opened.as_u64().unwrap_or(0)
483 } else {
484 0
485 },
486 tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
487 tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
488 iterations_completed: json["metrics"]["iterations"]["values"]["count"]
489 .as_u64()
490 .unwrap_or(0),
491 })
492 }
493}
494
495impl Default for K6Executor {
496 fn default() -> Self {
497 Self::new().expect("k6 not found")
498 }
499}
500
501#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
503pub struct K6Results {
504 pub total_requests: u64,
505 pub failed_requests: u64,
506 pub avg_duration_ms: f64,
507 pub p95_duration_ms: f64,
508 pub p99_duration_ms: f64,
509 pub rps: f64,
510 pub vus_max: u32,
511 pub min_duration_ms: f64,
512 pub max_duration_ms: f64,
513 pub med_duration_ms: f64,
514 pub p90_duration_ms: f64,
515 #[serde(default)]
520 pub server_injected_latency_samples: u64,
521 #[serde(default)]
522 pub server_injected_latency_avg_ms: f64,
523 #[serde(default)]
524 pub server_injected_latency_max_ms: f64,
525 #[serde(default)]
526 pub server_injected_jitter_samples: u64,
527 #[serde(default)]
528 pub server_injected_jitter_avg_ms: f64,
529 #[serde(default)]
531 pub server_reported_faults: u64,
532 #[serde(default)]
538 pub tcp_connect_samples: u64,
539 #[serde(default)]
540 pub tcp_connect_avg_ms: f64,
541 #[serde(default)]
542 pub tcp_connect_max_ms: f64,
543 #[serde(default)]
546 pub tls_handshake_samples: u64,
547 #[serde(default)]
548 pub tls_handshake_avg_ms: f64,
549 #[serde(default)]
550 pub tls_handshake_max_ms: f64,
551 #[serde(default)]
557 pub iterations_completed: u64,
558}
559
560impl K6Results {
561 pub fn error_rate(&self) -> f64 {
563 if self.total_requests == 0 {
564 return 0.0;
565 }
566 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
567 }
568
569 pub fn success_rate(&self) -> f64 {
571 100.0 - self.error_rate()
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578
579 #[test]
580 fn test_k6_results_error_rate() {
581 let results = K6Results {
582 total_requests: 100,
583 failed_requests: 5,
584 avg_duration_ms: 100.0,
585 p95_duration_ms: 200.0,
586 p99_duration_ms: 300.0,
587 ..Default::default()
588 };
589
590 assert_eq!(results.error_rate(), 5.0);
591 assert_eq!(results.success_rate(), 95.0);
592 }
593
594 #[test]
595 fn test_k6_results_zero_requests() {
596 let results = K6Results::default();
597 assert_eq!(results.error_rate(), 0.0);
598 }
599
600 #[test]
601 fn test_extract_failure_json_raw() {
602 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
603 let result = extract_failure_json(line).unwrap();
604 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
605 assert_eq!(parsed["check"], "test");
606 }
607
608 #[test]
609 fn test_extract_failure_json_logfmt() {
610 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
611 let result = extract_failure_json(line).unwrap();
612 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
613 assert_eq!(parsed["check"], "test");
614 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
615 }
616
617 #[test]
618 fn test_extract_failure_json_no_marker() {
619 assert!(extract_failure_json("just a regular log line").is_none());
620 }
621
622 #[test]
628 fn test_extract_exchange_logfmt_with_backslash_escapes() {
629 let line = r#"time="2026-06-26T10:00:00Z" level=info msg="MOCKFORGE_EXCHANGE:{\"check\":\"u\",\"request\":{\"body\":\"--bnd\\r\\n\\u001a\"}}" source=console"#;
633 let result = extract_exchange_json(line).unwrap();
634 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
635 assert_eq!(parsed["check"], "u");
636 assert_eq!(parsed["request"]["body"], "--bnd\r\n\u{001a}");
639 }
640
641 #[test]
642 fn test_extract_exchange_raw_no_logfmt_wrapping() {
643 let line =
644 r#"MOCKFORGE_EXCHANGE:{"check":"x","request":{"body":""},"response":{"status":200}}"#;
645 let result = extract_exchange_json(line).unwrap();
646 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
647 assert_eq!(parsed["check"], "x");
648 assert_eq!(parsed["response"]["status"], 200);
649 }
650
651 #[test]
655 fn test_extract_exchange_logfmt_tolerates_extra_trailing_fields() {
656 let line = r#"msg="MOCKFORGE_EXCHANGE:{\"check\":\"t\"}" source=console vu=1 iter=0"#;
657 let result = extract_exchange_json(line).unwrap();
658 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
659 assert_eq!(parsed["check"], "t");
660 }
661
662 #[test]
666 fn test_extract_exchange_double_backslash_followed_by_quote() {
667 let line = r#"msg="MOCKFORGE_EXCHANGE:{\"k\":\"a\\\\\\\"x\\\"\"}" source=console"#;
670 let result = extract_exchange_json(line).unwrap();
671 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
672 assert_eq!(parsed["k"], r#"a\"x""#);
673 }
674}