Skip to main content

mockforge_bench/
executor.rs

1//! k6 execution and output handling
2
3use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11/// Extract `MOCKFORGE_EXCHANGE:` JSON payload from a k6 output line (--export-requests).
12fn extract_exchange_json(line: &str) -> Option<String> {
13    let marker = "MOCKFORGE_EXCHANGE:";
14    let start = line.find(marker)?;
15    let json_start = start + marker.len();
16    let json_str = &line[json_start..];
17    let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
18    if json_str.is_empty() {
19        return None;
20    }
21    if json_str.starts_with('{') && json_str.contains("\\\"") {
22        Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
23    } else {
24        Some(json_str.to_string())
25    }
26}
27
28/// Extract `MOCKFORGE_FAILURE:` JSON payload from a k6 output line.
29///
30/// k6 may format console.log lines differently depending on output mode:
31/// - Raw: `MOCKFORGE_FAILURE:{...}`
32/// - Logfmt: `time="..." level=info msg="MOCKFORGE_FAILURE:{...}" source=console`
33fn extract_failure_json(line: &str) -> Option<String> {
34    let marker = "MOCKFORGE_FAILURE:";
35    let start = line.find(marker)?;
36    let json_start = start + marker.len();
37    let json_str = &line[json_start..];
38    // Trim trailing `" source=console` if present (k6 logfmt)
39    let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
40    if json_str.is_empty() {
41        return None;
42    }
43    // k6 logfmt wraps msg in quotes and escapes inner quotes as \" and
44    // backslashes as \\. Unescape in order: backslashes first, then quotes.
45    // Only unescape if the raw string doesn't parse as JSON (raw mode output).
46    if json_str.starts_with('{') && json_str.contains("\\\"") {
47        Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
48    } else {
49        Some(json_str.to_string())
50    }
51}
52
53/// k6 executor
54pub struct K6Executor {
55    k6_path: String,
56    /// Comma-joined IPs/ranges/CIDRs forwarded to `k6 run --local-ips`.
57    /// Empty → flag omitted. Populated by callers that pass through the
58    /// CLI's `--source-ip`; lets a VU make requests from one of several
59    /// bound interfaces (k6 supports this natively, contrary to my
60    /// round-22 warning).
61    local_ips: String,
62}
63
64impl K6Executor {
65    /// Create a new k6 executor
66    pub fn new() -> Result<Self> {
67        let k6_path = which::which("k6")
68            .map_err(|_| BenchError::K6NotFound)?
69            .to_string_lossy()
70            .to_string();
71
72        Ok(Self {
73            k6_path,
74            local_ips: String::new(),
75        })
76    }
77
78    /// Set the `--local-ips` value for subsequent k6 invocations.
79    /// Accepts a comma-joined list of IPs, ranges (`10.0.0.1-10.0.0.5`),
80    /// and/or CIDRs (`192.168.0.0/24`) - same syntax k6 expects.
81    pub fn with_local_ips(mut self, local_ips: impl Into<String>) -> Self {
82        self.local_ips = local_ips.into();
83        self
84    }
85
86    /// Check if k6 is installed
87    pub fn is_k6_installed() -> bool {
88        which::which("k6").is_ok()
89    }
90
91    /// Get k6 version
92    pub async fn get_version(&self) -> Result<String> {
93        let output = TokioCommand::new(&self.k6_path)
94            .arg("version")
95            .output()
96            .await
97            .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
98
99        if !output.status.success() {
100            return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
101        }
102
103        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
104    }
105
106    /// Execute a k6 script.
107    ///
108    /// `api_port` — when set, overrides k6's default API server address (`localhost:6565`)
109    /// to `localhost:<api_port>`. This prevents "address already in use" errors when
110    /// running multiple k6 instances in parallel (e.g., multi-target bench).
111    /// Pass `None` for single-target runs (uses k6's default).
112    pub async fn execute(
113        &self,
114        script_path: &Path,
115        output_dir: Option<&Path>,
116        verbose: bool,
117    ) -> Result<K6Results> {
118        self.execute_with_port(script_path, output_dir, verbose, None).await
119    }
120
121    /// Execute a k6 script with an optional custom API server port.
122    pub async fn execute_with_port(
123        &self,
124        script_path: &Path,
125        output_dir: Option<&Path>,
126        verbose: bool,
127        api_port: Option<u16>,
128    ) -> Result<K6Results> {
129        println!("Starting load test...\n");
130
131        let mut cmd = TokioCommand::new(&self.k6_path);
132        cmd.arg("run");
133
134        // When running multiple k6 instances in parallel, each needs its own API server port
135        // to avoid "bind: address already in use" on the default port 6565.
136        if let Some(port) = api_port {
137            cmd.arg("--address").arg(format!("localhost:{}", port));
138        }
139
140        // `--local-ips` rotates each VU through a pool of source IPs that
141        // must already be bound on the host (CIDRs/ranges accepted). This
142        // gives the k6 path the same source-IP coverage as the native
143        // self-test driver's `--source-ip`.
144        if !self.local_ips.is_empty() {
145            cmd.arg("--local-ips").arg(&self.local_ips);
146        }
147
148        // summary.json is written by the k6 script's handleSummary() function
149        // (relative to CWD, set to output_dir below). We no longer use
150        // --summary-export as it's deprecated in newer k6 versions and
151        // conflicts with handleSummary when both try to write the same file.
152
153        // Add verbosity
154        if verbose {
155            cmd.arg("--verbose");
156        }
157
158        // Use absolute path for the script so it's found regardless of CWD.
159        let abs_script =
160            std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
161        cmd.arg(&abs_script);
162
163        // Set working directory to output dir so handleSummary's relative
164        // "summary.json" path lands next to the script.
165        if let Some(dir) = output_dir {
166            cmd.current_dir(dir);
167        }
168
169        cmd.stdout(Stdio::piped());
170        cmd.stderr(Stdio::piped());
171
172        let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
173
174        let stdout = child
175            .stdout
176            .take()
177            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
178
179        let stderr = child
180            .stderr
181            .take()
182            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
183
184        // Stream output
185        let stdout_reader = BufReader::new(stdout);
186        let stderr_reader = BufReader::new(stderr);
187
188        let mut stdout_lines = stdout_reader.lines();
189        let mut stderr_lines = stderr_reader.lines();
190
191        // Create progress indicator
192        let spinner = ProgressBar::new_spinner();
193        spinner.set_style(
194            ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
195        );
196        spinner.set_message("Running load test...");
197
198        // Collect failure details from k6's console.log output
199        // k6 may emit console.log to either stdout or stderr depending on version/config
200        let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
201            Arc::new(tokio::sync::Mutex::new(Vec::new()));
202        let fd_stdout = Arc::clone(&failure_details);
203        let fd_stderr = Arc::clone(&failure_details);
204
205        // Collect request/response exchanges for --export-requests
206        let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
207            Arc::new(tokio::sync::Mutex::new(Vec::new()));
208        let ex_stdout = Arc::clone(&exchange_details);
209        let ex_stderr = Arc::clone(&exchange_details);
210
211        // Collect all k6 output for saving to a log file
212        let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
213            Arc::new(tokio::sync::Mutex::new(Vec::new()));
214        let log_stdout = Arc::clone(&log_lines);
215        let log_stderr = Arc::clone(&log_lines);
216
217        // Read stdout lines, capturing MOCKFORGE_FAILURE and MOCKFORGE_EXCHANGE markers
218        let stdout_handle = tokio::spawn(async move {
219            while let Ok(Some(line)) = stdout_lines.next_line().await {
220                log_stdout.lock().await.push(format!("[stdout] {}", line));
221                if let Some(json_str) = extract_failure_json(&line) {
222                    fd_stdout.lock().await.push(json_str);
223                } else if let Some(json_str) = extract_exchange_json(&line) {
224                    ex_stdout.lock().await.push(json_str);
225                } else {
226                    spinner.set_message(line.clone());
227                    if !line.is_empty() && !line.contains("running") && !line.contains("default") {
228                        println!("{}", line);
229                    }
230                }
231            }
232            spinner.finish_and_clear();
233        });
234
235        // Read stderr lines, capturing MOCKFORGE_FAILURE and MOCKFORGE_EXCHANGE markers
236        let stderr_handle = tokio::spawn(async move {
237            while let Ok(Some(line)) = stderr_lines.next_line().await {
238                if !line.is_empty() {
239                    log_stderr.lock().await.push(format!("[stderr] {}", line));
240                    if let Some(json_str) = extract_failure_json(&line) {
241                        fd_stderr.lock().await.push(json_str);
242                    } else if let Some(json_str) = extract_exchange_json(&line) {
243                        ex_stderr.lock().await.push(json_str);
244                    } else {
245                        eprintln!("{}", line);
246                    }
247                }
248            }
249        });
250
251        // Wait for completion
252        let status =
253            child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
254
255        // Wait for both reader tasks to finish processing all lines
256        let _ = stdout_handle.await;
257        let _ = stderr_handle.await;
258
259        // k6 exit code 99 = thresholds crossed. The test DID run and summary.json
260        // should still be present. Only treat non-99 failures as hard errors.
261        let exit_code = status.code().unwrap_or(-1);
262        if !status.success() && exit_code != 99 {
263            return Err(BenchError::K6ExecutionFailed(format!(
264                "k6 exited with status: {}",
265                status
266            )));
267        }
268        if exit_code == 99 {
269            tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
270        }
271
272        // Write failure details to file if any were captured
273        if let Some(dir) = output_dir {
274            let details = failure_details.lock().await;
275            if !details.is_empty() {
276                let failure_path = dir.join("conformance-failure-details.json");
277                let parsed: Vec<serde_json::Value> =
278                    details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
279                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
280                    let _ = std::fs::write(&failure_path, json);
281                }
282            }
283
284            // Write exchange details (--export-requests) if any were captured
285            let exchanges = exchange_details.lock().await;
286            if !exchanges.is_empty() {
287                let exchange_path = dir.join("conformance-requests.json");
288                let parsed: Vec<serde_json::Value> =
289                    exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
290                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
291                    let _ = std::fs::write(&exchange_path, json);
292                    tracing::info!(
293                        "Exported {} request/response pairs to {}",
294                        parsed.len(),
295                        exchange_path.display()
296                    );
297                }
298            }
299
300            // Save full k6 output to a log file for debugging
301            let lines = log_lines.lock().await;
302            if !lines.is_empty() {
303                let log_path = dir.join("k6-output.log");
304                let _ = std::fs::write(&log_path, lines.join("\n"));
305                println!("k6 output log saved to: {}", log_path.display());
306            }
307        }
308
309        // Parse results if output directory was specified
310        let results = if let Some(dir) = output_dir {
311            Self::parse_results(dir)?
312        } else {
313            K6Results::default()
314        };
315
316        Ok(results)
317    }
318
319    /// Parse k6 results from JSON output
320    fn parse_results(output_dir: &Path) -> Result<K6Results> {
321        let summary_path = output_dir.join("summary.json");
322
323        if !summary_path.exists() {
324            return Ok(K6Results::default());
325        }
326
327        let content = std::fs::read_to_string(summary_path)
328            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
329
330        let json: serde_json::Value = serde_json::from_str(&content)
331            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
332
333        let duration_values = &json["metrics"]["http_req_duration"]["values"];
334
335        let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
336        let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
337        let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
338
339        // Issue #79 (round 5) — surface TCP connect / TLS handshake stats and
340        // a connection-rate count for `--cps` runs.
341        //
342        // Round 6 follow-up: k6's `http_req_connecting` Trend doesn't expose a
343        // `count` field in summary.json (only avg/min/med/max/p90/p95), so we
344        // can't use it for "connections opened". The template now feeds a
345        // dedicated Counter, `mockforge_connections_opened`, every time a
346        // request's `res.timings.connecting > 0`. That gives us an accurate
347        // count for both `--cps` (≈ total_requests) and pooled-reuse (≈ vus_max)
348        // runs. The Trend is still useful for the avg/max timing display.
349        let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
350        let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
351        let mf_conns_opened = &json["metrics"]["mockforge_connections_opened"]["values"]["count"];
352
353        Ok(K6Results {
354            total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
355            // k6 Rate metric: `passes` = count of non-zero values.
356            // For http_req_failed, non-zero means the request failed.
357            // So `passes` = failed request count, `fails` = successful request count.
358            failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
359                .as_u64()
360                .unwrap_or(0),
361            avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
362            p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
363            p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
364            rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
365            vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
366            min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
367            max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
368            med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
369            p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
370            server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
371            server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
372            server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
373            server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
374            server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
375            server_reported_faults: server_fault.as_u64().unwrap_or(0),
376            // Counter from the template, not the Trend's count (which is
377            // absent in k6 summary JSON).
378            tcp_connect_samples: mf_conns_opened.as_u64().unwrap_or(0),
379            tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
380            tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
381            // TLS handshake Trend has no `count` either; gate display on avg>0.
382            tls_handshake_samples: if tls_handshake["avg"].as_f64().unwrap_or(0.0) > 0.0 {
383                // Use connection count as a proxy — every new TLS session
384                // requires a handshake.
385                mf_conns_opened.as_u64().unwrap_or(0)
386            } else {
387                0
388            },
389            tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
390            tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
391            iterations_completed: json["metrics"]["iterations"]["values"]["count"]
392                .as_u64()
393                .unwrap_or(0),
394        })
395    }
396}
397
398impl Default for K6Executor {
399    fn default() -> Self {
400        Self::new().expect("k6 not found")
401    }
402}
403
404/// k6 test results
405#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
406pub struct K6Results {
407    pub total_requests: u64,
408    pub failed_requests: u64,
409    pub avg_duration_ms: f64,
410    pub p95_duration_ms: f64,
411    pub p99_duration_ms: f64,
412    pub rps: f64,
413    pub vus_max: u32,
414    pub min_duration_ms: f64,
415    pub max_duration_ms: f64,
416    pub med_duration_ms: f64,
417    pub p90_duration_ms: f64,
418    /// Issue #79 — client-side visibility into MockForge-injected latency,
419    /// parsed from the `X-Mockforge-Injected-Latency-Ms` response header that
420    /// the chaos middleware sets. Zero when chaos isn't firing or the target
421    /// isn't MockForge.
422    #[serde(default)]
423    pub server_injected_latency_samples: u64,
424    #[serde(default)]
425    pub server_injected_latency_avg_ms: f64,
426    #[serde(default)]
427    pub server_injected_latency_max_ms: f64,
428    #[serde(default)]
429    pub server_injected_jitter_samples: u64,
430    #[serde(default)]
431    pub server_injected_jitter_avg_ms: f64,
432    /// Count of responses that carried an `X-Mockforge-Fault` header.
433    #[serde(default)]
434    pub server_reported_faults: u64,
435    /// Issue #79 (round 5) — TCP connect samples / timing. With `--cps`
436    /// (`noConnectionReuse: true`) k6 records one connect per request, so
437    /// `tcp_connect_samples` equals connections opened. Without `--cps` this
438    /// is typically a small count (k6 reuses pooled connections), so it tells
439    /// you whether reuse was actually happening.
440    #[serde(default)]
441    pub tcp_connect_samples: u64,
442    #[serde(default)]
443    pub tcp_connect_avg_ms: f64,
444    #[serde(default)]
445    pub tcp_connect_max_ms: f64,
446    /// TLS handshake samples / timing — same shape as TCP connect, but only
447    /// non-zero for HTTPS targets.
448    #[serde(default)]
449    pub tls_handshake_samples: u64,
450    #[serde(default)]
451    pub tls_handshake_avg_ms: f64,
452    #[serde(default)]
453    pub tls_handshake_max_ms: f64,
454    /// Issue #79 round 10 — k6 iteration counter from `iterations.values.count`.
455    /// For `constant-arrival-rate` (`--rps`), this is the number of full
456    /// iterations completed within the duration. When `iterations × num_ops`
457    /// is much less than `total_requests`, mid-iteration cancellation truncated
458    /// the run and not every operation in the spec was exercised.
459    #[serde(default)]
460    pub iterations_completed: u64,
461}
462
463impl K6Results {
464    /// Get error rate as a percentage
465    pub fn error_rate(&self) -> f64 {
466        if self.total_requests == 0 {
467            return 0.0;
468        }
469        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
470    }
471
472    /// Get success rate as a percentage
473    pub fn success_rate(&self) -> f64 {
474        100.0 - self.error_rate()
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481
482    #[test]
483    fn test_k6_results_error_rate() {
484        let results = K6Results {
485            total_requests: 100,
486            failed_requests: 5,
487            avg_duration_ms: 100.0,
488            p95_duration_ms: 200.0,
489            p99_duration_ms: 300.0,
490            ..Default::default()
491        };
492
493        assert_eq!(results.error_rate(), 5.0);
494        assert_eq!(results.success_rate(), 95.0);
495    }
496
497    #[test]
498    fn test_k6_results_zero_requests() {
499        let results = K6Results::default();
500        assert_eq!(results.error_rate(), 0.0);
501    }
502
503    #[test]
504    fn test_extract_failure_json_raw() {
505        let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
506        let result = extract_failure_json(line).unwrap();
507        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
508        assert_eq!(parsed["check"], "test");
509    }
510
511    #[test]
512    fn test_extract_failure_json_logfmt() {
513        let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
514        let result = extract_failure_json(line).unwrap();
515        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
516        assert_eq!(parsed["check"], "test");
517        assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
518    }
519
520    #[test]
521    fn test_extract_failure_json_no_marker() {
522        assert!(extract_failure_json("just a regular log line").is_none());
523    }
524}