1use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11fn extract_exchange_json(line: &str) -> Option<String> {
13 let marker = "MOCKFORGE_EXCHANGE:";
14 let start = line.find(marker)?;
15 let json_start = start + marker.len();
16 let json_str = &line[json_start..];
17 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
18 if json_str.is_empty() {
19 return None;
20 }
21 if json_str.starts_with('{') && json_str.contains("\\\"") {
22 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
23 } else {
24 Some(json_str.to_string())
25 }
26}
27
28fn extract_failure_json(line: &str) -> Option<String> {
34 let marker = "MOCKFORGE_FAILURE:";
35 let start = line.find(marker)?;
36 let json_start = start + marker.len();
37 let json_str = &line[json_start..];
38 let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
40 if json_str.is_empty() {
41 return None;
42 }
43 if json_str.starts_with('{') && json_str.contains("\\\"") {
47 Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
48 } else {
49 Some(json_str.to_string())
50 }
51}
52
53pub struct K6Executor {
55 k6_path: String,
56 local_ips: String,
62}
63
64impl K6Executor {
65 pub fn new() -> Result<Self> {
67 let k6_path = which::which("k6")
68 .map_err(|_| BenchError::K6NotFound)?
69 .to_string_lossy()
70 .to_string();
71
72 Ok(Self {
73 k6_path,
74 local_ips: String::new(),
75 })
76 }
77
78 pub fn with_local_ips(mut self, local_ips: impl Into<String>) -> Self {
82 self.local_ips = local_ips.into();
83 self
84 }
85
86 pub fn is_k6_installed() -> bool {
88 which::which("k6").is_ok()
89 }
90
91 pub async fn get_version(&self) -> Result<String> {
93 let output = TokioCommand::new(&self.k6_path)
94 .arg("version")
95 .output()
96 .await
97 .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
98
99 if !output.status.success() {
100 return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
101 }
102
103 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
104 }
105
106 pub async fn execute(
113 &self,
114 script_path: &Path,
115 output_dir: Option<&Path>,
116 verbose: bool,
117 ) -> Result<K6Results> {
118 self.execute_with_port(script_path, output_dir, verbose, None).await
119 }
120
121 pub async fn execute_with_port(
123 &self,
124 script_path: &Path,
125 output_dir: Option<&Path>,
126 verbose: bool,
127 api_port: Option<u16>,
128 ) -> Result<K6Results> {
129 println!("Starting load test...\n");
130
131 let mut cmd = TokioCommand::new(&self.k6_path);
132 cmd.arg("run");
133
134 if let Some(port) = api_port {
137 cmd.arg("--address").arg(format!("localhost:{}", port));
138 }
139
140 if !self.local_ips.is_empty() {
145 cmd.arg("--local-ips").arg(&self.local_ips);
146 }
147
148 if verbose {
155 cmd.arg("--verbose");
156 }
157
158 let abs_script =
160 std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
161 cmd.arg(&abs_script);
162
163 if let Some(dir) = output_dir {
166 cmd.current_dir(dir);
167 }
168
169 cmd.stdout(Stdio::piped());
170 cmd.stderr(Stdio::piped());
171
172 let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
173
174 let stdout = child
175 .stdout
176 .take()
177 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
178
179 let stderr = child
180 .stderr
181 .take()
182 .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
183
184 let stdout_reader = BufReader::new(stdout);
186 let stderr_reader = BufReader::new(stderr);
187
188 let mut stdout_lines = stdout_reader.lines();
189 let mut stderr_lines = stderr_reader.lines();
190
191 let spinner = ProgressBar::new_spinner();
193 spinner.set_style(
194 ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
195 );
196 spinner.set_message("Running load test...");
197
198 let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
201 Arc::new(tokio::sync::Mutex::new(Vec::new()));
202 let fd_stdout = Arc::clone(&failure_details);
203 let fd_stderr = Arc::clone(&failure_details);
204
205 let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
207 Arc::new(tokio::sync::Mutex::new(Vec::new()));
208 let ex_stdout = Arc::clone(&exchange_details);
209 let ex_stderr = Arc::clone(&exchange_details);
210
211 let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
213 Arc::new(tokio::sync::Mutex::new(Vec::new()));
214 let log_stdout = Arc::clone(&log_lines);
215 let log_stderr = Arc::clone(&log_lines);
216
217 let stdout_handle = tokio::spawn(async move {
219 while let Ok(Some(line)) = stdout_lines.next_line().await {
220 log_stdout.lock().await.push(format!("[stdout] {}", line));
221 if let Some(json_str) = extract_failure_json(&line) {
222 fd_stdout.lock().await.push(json_str);
223 } else if let Some(json_str) = extract_exchange_json(&line) {
224 ex_stdout.lock().await.push(json_str);
225 } else {
226 spinner.set_message(line.clone());
227 if !line.is_empty() && !line.contains("running") && !line.contains("default") {
228 println!("{}", line);
229 }
230 }
231 }
232 spinner.finish_and_clear();
233 });
234
235 let stderr_handle = tokio::spawn(async move {
237 while let Ok(Some(line)) = stderr_lines.next_line().await {
238 if !line.is_empty() {
239 log_stderr.lock().await.push(format!("[stderr] {}", line));
240 if let Some(json_str) = extract_failure_json(&line) {
241 fd_stderr.lock().await.push(json_str);
242 } else if let Some(json_str) = extract_exchange_json(&line) {
243 ex_stderr.lock().await.push(json_str);
244 } else {
245 eprintln!("{}", line);
246 }
247 }
248 }
249 });
250
251 let status =
253 child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
254
255 let _ = stdout_handle.await;
257 let _ = stderr_handle.await;
258
259 let exit_code = status.code().unwrap_or(-1);
262 if !status.success() && exit_code != 99 {
263 return Err(BenchError::K6ExecutionFailed(format!(
264 "k6 exited with status: {}",
265 status
266 )));
267 }
268 if exit_code == 99 {
269 tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
270 }
271
272 if let Some(dir) = output_dir {
274 let details = failure_details.lock().await;
275 if !details.is_empty() {
276 let failure_path = dir.join("conformance-failure-details.json");
277 let parsed: Vec<serde_json::Value> =
278 details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
279 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
280 let _ = std::fs::write(&failure_path, json);
281 }
282 }
283
284 let exchanges = exchange_details.lock().await;
286 if !exchanges.is_empty() {
287 let exchange_path = dir.join("conformance-requests.json");
288 let parsed: Vec<serde_json::Value> =
289 exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
290 if let Ok(json) = serde_json::to_string_pretty(&parsed) {
291 let _ = std::fs::write(&exchange_path, json);
292 tracing::info!(
293 "Exported {} request/response pairs to {}",
294 parsed.len(),
295 exchange_path.display()
296 );
297 }
298 }
299
300 let lines = log_lines.lock().await;
302 if !lines.is_empty() {
303 let log_path = dir.join("k6-output.log");
304 let _ = std::fs::write(&log_path, lines.join("\n"));
305 println!("k6 output log saved to: {}", log_path.display());
306 }
307 }
308
309 let results = if let Some(dir) = output_dir {
311 Self::parse_results(dir)?
312 } else {
313 K6Results::default()
314 };
315
316 Ok(results)
317 }
318
319 fn parse_results(output_dir: &Path) -> Result<K6Results> {
321 let summary_path = output_dir.join("summary.json");
322
323 if !summary_path.exists() {
324 return Ok(K6Results::default());
325 }
326
327 let content = std::fs::read_to_string(summary_path)
328 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
329
330 let json: serde_json::Value = serde_json::from_str(&content)
331 .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
332
333 let duration_values = &json["metrics"]["http_req_duration"]["values"];
334
335 let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
336 let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
337 let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
338
339 let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
350 let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
351 let mf_conns_opened = &json["metrics"]["mockforge_connections_opened"]["values"]["count"];
352
353 Ok(K6Results {
354 total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
355 failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
359 .as_u64()
360 .unwrap_or(0),
361 avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
362 p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
363 p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
364 rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
365 vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
366 min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
367 max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
368 med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
369 p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
370 server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
371 server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
372 server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
373 server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
374 server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
375 server_reported_faults: server_fault.as_u64().unwrap_or(0),
376 tcp_connect_samples: mf_conns_opened.as_u64().unwrap_or(0),
379 tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
380 tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
381 tls_handshake_samples: if tls_handshake["avg"].as_f64().unwrap_or(0.0) > 0.0 {
383 mf_conns_opened.as_u64().unwrap_or(0)
386 } else {
387 0
388 },
389 tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
390 tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
391 iterations_completed: json["metrics"]["iterations"]["values"]["count"]
392 .as_u64()
393 .unwrap_or(0),
394 })
395 }
396}
397
398impl Default for K6Executor {
399 fn default() -> Self {
400 Self::new().expect("k6 not found")
401 }
402}
403
404#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
406pub struct K6Results {
407 pub total_requests: u64,
408 pub failed_requests: u64,
409 pub avg_duration_ms: f64,
410 pub p95_duration_ms: f64,
411 pub p99_duration_ms: f64,
412 pub rps: f64,
413 pub vus_max: u32,
414 pub min_duration_ms: f64,
415 pub max_duration_ms: f64,
416 pub med_duration_ms: f64,
417 pub p90_duration_ms: f64,
418 #[serde(default)]
423 pub server_injected_latency_samples: u64,
424 #[serde(default)]
425 pub server_injected_latency_avg_ms: f64,
426 #[serde(default)]
427 pub server_injected_latency_max_ms: f64,
428 #[serde(default)]
429 pub server_injected_jitter_samples: u64,
430 #[serde(default)]
431 pub server_injected_jitter_avg_ms: f64,
432 #[serde(default)]
434 pub server_reported_faults: u64,
435 #[serde(default)]
441 pub tcp_connect_samples: u64,
442 #[serde(default)]
443 pub tcp_connect_avg_ms: f64,
444 #[serde(default)]
445 pub tcp_connect_max_ms: f64,
446 #[serde(default)]
449 pub tls_handshake_samples: u64,
450 #[serde(default)]
451 pub tls_handshake_avg_ms: f64,
452 #[serde(default)]
453 pub tls_handshake_max_ms: f64,
454 #[serde(default)]
460 pub iterations_completed: u64,
461}
462
463impl K6Results {
464 pub fn error_rate(&self) -> f64 {
466 if self.total_requests == 0 {
467 return 0.0;
468 }
469 (self.failed_requests as f64 / self.total_requests as f64) * 100.0
470 }
471
472 pub fn success_rate(&self) -> f64 {
474 100.0 - self.error_rate()
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481
482 #[test]
483 fn test_k6_results_error_rate() {
484 let results = K6Results {
485 total_requests: 100,
486 failed_requests: 5,
487 avg_duration_ms: 100.0,
488 p95_duration_ms: 200.0,
489 p99_duration_ms: 300.0,
490 ..Default::default()
491 };
492
493 assert_eq!(results.error_rate(), 5.0);
494 assert_eq!(results.success_rate(), 95.0);
495 }
496
497 #[test]
498 fn test_k6_results_zero_requests() {
499 let results = K6Results::default();
500 assert_eq!(results.error_rate(), 0.0);
501 }
502
503 #[test]
504 fn test_extract_failure_json_raw() {
505 let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
506 let result = extract_failure_json(line).unwrap();
507 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
508 assert_eq!(parsed["check"], "test");
509 }
510
511 #[test]
512 fn test_extract_failure_json_logfmt() {
513 let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
514 let result = extract_failure_json(line).unwrap();
515 let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
516 assert_eq!(parsed["check"], "test");
517 assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
518 }
519
520 #[test]
521 fn test_extract_failure_json_no_marker() {
522 assert!(extract_failure_json("just a regular log line").is_none());
523 }
524}