Skip to main content

mockforge_bench/
executor.rs

1//! k6 execution and output handling
2
3use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11/// Extract `MOCKFORGE_EXCHANGE:` JSON payload from a k6 output line (--export-requests).
12fn extract_exchange_json(line: &str) -> Option<String> {
13    let marker = "MOCKFORGE_EXCHANGE:";
14    let start = line.find(marker)?;
15    let json_start = start + marker.len();
16    let json_str = &line[json_start..];
17    let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
18    if json_str.is_empty() {
19        return None;
20    }
21    if json_str.starts_with('{') && json_str.contains("\\\"") {
22        Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
23    } else {
24        Some(json_str.to_string())
25    }
26}
27
28/// Extract `MOCKFORGE_FAILURE:` JSON payload from a k6 output line.
29///
30/// k6 may format console.log lines differently depending on output mode:
31/// - Raw: `MOCKFORGE_FAILURE:{...}`
32/// - Logfmt: `time="..." level=info msg="MOCKFORGE_FAILURE:{...}" source=console`
33fn extract_failure_json(line: &str) -> Option<String> {
34    let marker = "MOCKFORGE_FAILURE:";
35    let start = line.find(marker)?;
36    let json_start = start + marker.len();
37    let json_str = &line[json_start..];
38    // Trim trailing `" source=console` if present (k6 logfmt)
39    let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
40    if json_str.is_empty() {
41        return None;
42    }
43    // k6 logfmt wraps msg in quotes and escapes inner quotes as \" and
44    // backslashes as \\. Unescape in order: backslashes first, then quotes.
45    // Only unescape if the raw string doesn't parse as JSON (raw mode output).
46    if json_str.starts_with('{') && json_str.contains("\\\"") {
47        Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
48    } else {
49        Some(json_str.to_string())
50    }
51}
52
53/// k6 executor
54pub struct K6Executor {
55    k6_path: String,
56}
57
58impl K6Executor {
59    /// Create a new k6 executor
60    pub fn new() -> Result<Self> {
61        let k6_path = which::which("k6")
62            .map_err(|_| BenchError::K6NotFound)?
63            .to_string_lossy()
64            .to_string();
65
66        Ok(Self { k6_path })
67    }
68
69    /// Check if k6 is installed
70    pub fn is_k6_installed() -> bool {
71        which::which("k6").is_ok()
72    }
73
74    /// Get k6 version
75    pub async fn get_version(&self) -> Result<String> {
76        let output = TokioCommand::new(&self.k6_path)
77            .arg("version")
78            .output()
79            .await
80            .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
81
82        if !output.status.success() {
83            return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
84        }
85
86        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
87    }
88
89    /// Execute a k6 script.
90    ///
91    /// `api_port` — when set, overrides k6's default API server address (`localhost:6565`)
92    /// to `localhost:<api_port>`. This prevents "address already in use" errors when
93    /// running multiple k6 instances in parallel (e.g., multi-target bench).
94    /// Pass `None` for single-target runs (uses k6's default).
95    pub async fn execute(
96        &self,
97        script_path: &Path,
98        output_dir: Option<&Path>,
99        verbose: bool,
100    ) -> Result<K6Results> {
101        self.execute_with_port(script_path, output_dir, verbose, None).await
102    }
103
104    /// Execute a k6 script with an optional custom API server port.
105    pub async fn execute_with_port(
106        &self,
107        script_path: &Path,
108        output_dir: Option<&Path>,
109        verbose: bool,
110        api_port: Option<u16>,
111    ) -> Result<K6Results> {
112        println!("Starting load test...\n");
113
114        let mut cmd = TokioCommand::new(&self.k6_path);
115        cmd.arg("run");
116
117        // When running multiple k6 instances in parallel, each needs its own API server port
118        // to avoid "bind: address already in use" on the default port 6565.
119        if let Some(port) = api_port {
120            cmd.arg("--address").arg(format!("localhost:{}", port));
121        }
122
123        // summary.json is written by the k6 script's handleSummary() function
124        // (relative to CWD, set to output_dir below). We no longer use
125        // --summary-export as it's deprecated in newer k6 versions and
126        // conflicts with handleSummary when both try to write the same file.
127
128        // Add verbosity
129        if verbose {
130            cmd.arg("--verbose");
131        }
132
133        // Use absolute path for the script so it's found regardless of CWD.
134        let abs_script =
135            std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
136        cmd.arg(&abs_script);
137
138        // Set working directory to output dir so handleSummary's relative
139        // "summary.json" path lands next to the script.
140        if let Some(dir) = output_dir {
141            cmd.current_dir(dir);
142        }
143
144        cmd.stdout(Stdio::piped());
145        cmd.stderr(Stdio::piped());
146
147        let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
148
149        let stdout = child
150            .stdout
151            .take()
152            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
153
154        let stderr = child
155            .stderr
156            .take()
157            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
158
159        // Stream output
160        let stdout_reader = BufReader::new(stdout);
161        let stderr_reader = BufReader::new(stderr);
162
163        let mut stdout_lines = stdout_reader.lines();
164        let mut stderr_lines = stderr_reader.lines();
165
166        // Create progress indicator
167        let spinner = ProgressBar::new_spinner();
168        spinner.set_style(
169            ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
170        );
171        spinner.set_message("Running load test...");
172
173        // Collect failure details from k6's console.log output
174        // k6 may emit console.log to either stdout or stderr depending on version/config
175        let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
176            Arc::new(tokio::sync::Mutex::new(Vec::new()));
177        let fd_stdout = Arc::clone(&failure_details);
178        let fd_stderr = Arc::clone(&failure_details);
179
180        // Collect request/response exchanges for --export-requests
181        let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
182            Arc::new(tokio::sync::Mutex::new(Vec::new()));
183        let ex_stdout = Arc::clone(&exchange_details);
184        let ex_stderr = Arc::clone(&exchange_details);
185
186        // Collect all k6 output for saving to a log file
187        let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
188            Arc::new(tokio::sync::Mutex::new(Vec::new()));
189        let log_stdout = Arc::clone(&log_lines);
190        let log_stderr = Arc::clone(&log_lines);
191
192        // Read stdout lines, capturing MOCKFORGE_FAILURE and MOCKFORGE_EXCHANGE markers
193        let stdout_handle = tokio::spawn(async move {
194            while let Ok(Some(line)) = stdout_lines.next_line().await {
195                log_stdout.lock().await.push(format!("[stdout] {}", line));
196                if let Some(json_str) = extract_failure_json(&line) {
197                    fd_stdout.lock().await.push(json_str);
198                } else if let Some(json_str) = extract_exchange_json(&line) {
199                    ex_stdout.lock().await.push(json_str);
200                } else {
201                    spinner.set_message(line.clone());
202                    if !line.is_empty() && !line.contains("running") && !line.contains("default") {
203                        println!("{}", line);
204                    }
205                }
206            }
207            spinner.finish_and_clear();
208        });
209
210        // Read stderr lines, capturing MOCKFORGE_FAILURE and MOCKFORGE_EXCHANGE markers
211        let stderr_handle = tokio::spawn(async move {
212            while let Ok(Some(line)) = stderr_lines.next_line().await {
213                if !line.is_empty() {
214                    log_stderr.lock().await.push(format!("[stderr] {}", line));
215                    if let Some(json_str) = extract_failure_json(&line) {
216                        fd_stderr.lock().await.push(json_str);
217                    } else if let Some(json_str) = extract_exchange_json(&line) {
218                        ex_stderr.lock().await.push(json_str);
219                    } else {
220                        eprintln!("{}", line);
221                    }
222                }
223            }
224        });
225
226        // Wait for completion
227        let status =
228            child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
229
230        // Wait for both reader tasks to finish processing all lines
231        let _ = stdout_handle.await;
232        let _ = stderr_handle.await;
233
234        // k6 exit code 99 = thresholds crossed. The test DID run and summary.json
235        // should still be present. Only treat non-99 failures as hard errors.
236        let exit_code = status.code().unwrap_or(-1);
237        if !status.success() && exit_code != 99 {
238            return Err(BenchError::K6ExecutionFailed(format!(
239                "k6 exited with status: {}",
240                status
241            )));
242        }
243        if exit_code == 99 {
244            tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
245        }
246
247        // Write failure details to file if any were captured
248        if let Some(dir) = output_dir {
249            let details = failure_details.lock().await;
250            if !details.is_empty() {
251                let failure_path = dir.join("conformance-failure-details.json");
252                let parsed: Vec<serde_json::Value> =
253                    details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
254                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
255                    let _ = std::fs::write(&failure_path, json);
256                }
257            }
258
259            // Write exchange details (--export-requests) if any were captured
260            let exchanges = exchange_details.lock().await;
261            if !exchanges.is_empty() {
262                let exchange_path = dir.join("conformance-requests.json");
263                let parsed: Vec<serde_json::Value> =
264                    exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
265                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
266                    let _ = std::fs::write(&exchange_path, json);
267                    tracing::info!(
268                        "Exported {} request/response pairs to {}",
269                        parsed.len(),
270                        exchange_path.display()
271                    );
272                }
273            }
274
275            // Save full k6 output to a log file for debugging
276            let lines = log_lines.lock().await;
277            if !lines.is_empty() {
278                let log_path = dir.join("k6-output.log");
279                let _ = std::fs::write(&log_path, lines.join("\n"));
280                println!("k6 output log saved to: {}", log_path.display());
281            }
282        }
283
284        // Parse results if output directory was specified
285        let results = if let Some(dir) = output_dir {
286            Self::parse_results(dir)?
287        } else {
288            K6Results::default()
289        };
290
291        Ok(results)
292    }
293
294    /// Parse k6 results from JSON output
295    fn parse_results(output_dir: &Path) -> Result<K6Results> {
296        let summary_path = output_dir.join("summary.json");
297
298        if !summary_path.exists() {
299            return Ok(K6Results::default());
300        }
301
302        let content = std::fs::read_to_string(summary_path)
303            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
304
305        let json: serde_json::Value = serde_json::from_str(&content)
306            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
307
308        let duration_values = &json["metrics"]["http_req_duration"]["values"];
309
310        let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
311        let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
312        let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
313
314        Ok(K6Results {
315            total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
316            // k6 Rate metric: `passes` = count of non-zero values.
317            // For http_req_failed, non-zero means the request failed.
318            // So `passes` = failed request count, `fails` = successful request count.
319            failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
320                .as_u64()
321                .unwrap_or(0),
322            avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
323            p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
324            p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
325            rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
326            vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
327            min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
328            max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
329            med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
330            p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
331            server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
332            server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
333            server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
334            server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
335            server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
336            server_reported_faults: server_fault.as_u64().unwrap_or(0),
337        })
338    }
339}
340
341impl Default for K6Executor {
342    fn default() -> Self {
343        Self::new().expect("k6 not found")
344    }
345}
346
347/// k6 test results
348#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
349pub struct K6Results {
350    pub total_requests: u64,
351    pub failed_requests: u64,
352    pub avg_duration_ms: f64,
353    pub p95_duration_ms: f64,
354    pub p99_duration_ms: f64,
355    pub rps: f64,
356    pub vus_max: u32,
357    pub min_duration_ms: f64,
358    pub max_duration_ms: f64,
359    pub med_duration_ms: f64,
360    pub p90_duration_ms: f64,
361    /// Issue #79 — client-side visibility into MockForge-injected latency,
362    /// parsed from the `X-Mockforge-Injected-Latency-Ms` response header that
363    /// the chaos middleware sets. Zero when chaos isn't firing or the target
364    /// isn't MockForge.
365    #[serde(default)]
366    pub server_injected_latency_samples: u64,
367    #[serde(default)]
368    pub server_injected_latency_avg_ms: f64,
369    #[serde(default)]
370    pub server_injected_latency_max_ms: f64,
371    #[serde(default)]
372    pub server_injected_jitter_samples: u64,
373    #[serde(default)]
374    pub server_injected_jitter_avg_ms: f64,
375    /// Count of responses that carried an `X-Mockforge-Fault` header.
376    #[serde(default)]
377    pub server_reported_faults: u64,
378}
379
380impl K6Results {
381    /// Get error rate as a percentage
382    pub fn error_rate(&self) -> f64 {
383        if self.total_requests == 0 {
384            return 0.0;
385        }
386        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
387    }
388
389    /// Get success rate as a percentage
390    pub fn success_rate(&self) -> f64 {
391        100.0 - self.error_rate()
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn test_k6_results_error_rate() {
401        let results = K6Results {
402            total_requests: 100,
403            failed_requests: 5,
404            avg_duration_ms: 100.0,
405            p95_duration_ms: 200.0,
406            p99_duration_ms: 300.0,
407            ..Default::default()
408        };
409
410        assert_eq!(results.error_rate(), 5.0);
411        assert_eq!(results.success_rate(), 95.0);
412    }
413
414    #[test]
415    fn test_k6_results_zero_requests() {
416        let results = K6Results::default();
417        assert_eq!(results.error_rate(), 0.0);
418    }
419
420    #[test]
421    fn test_extract_failure_json_raw() {
422        let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
423        let result = extract_failure_json(line).unwrap();
424        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
425        assert_eq!(parsed["check"], "test");
426    }
427
428    #[test]
429    fn test_extract_failure_json_logfmt() {
430        let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
431        let result = extract_failure_json(line).unwrap();
432        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
433        assert_eq!(parsed["check"], "test");
434        assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
435    }
436
437    #[test]
438    fn test_extract_failure_json_no_marker() {
439        assert!(extract_failure_json("just a regular log line").is_none());
440    }
441}