Skip to main content

mockforge_bench/
executor.rs

1//! k6 execution and output handling
2
3use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11/// Extract a `MOCKFORGE_<KIND>:` JSON payload from a k6 output line.
12///
13/// k6 emits these via `console.log`, which goes through one of two paths
14/// depending on the runner config:
15/// - **Raw**: `MOCKFORGE_EXCHANGE:{"check":"...", ...}` straight to stdout.
16/// - **Logfmt**: `time="..." level=info msg="MOCKFORGE_EXCHANGE:{...}" source=console`
17///   where the JSON's `"` are escaped as `\"` and `\` as `\\` so it fits
18///   inside the `msg="..."` field.
19///
20/// Round 46 (#79) — Srikanth on 0.3.190: a multipart upload landed `[]`
21/// in `conformance-requests.json` even though `MOCKFORGE_EXCHANGE:` was
22/// present in the k6 log. Root cause: the previous parser used a naive
23/// `replace("\\\\", "\\").replace("\\\"", "\"")` chain. With binary
24/// multipart bytes the JSON content includes sequences like `\\"`
25/// (literal backslash followed by literal quote inside a JSON string),
26/// which logfmt-escapes to `\\\\\"`. The replace chain processed `\\`
27/// → `\` first, leaving `\\\"`, then `\"` → `"`, mangling the JSON.
28/// Replaced with a single character walk that consumes one logfmt
29/// escape at a time. Also rewrote the suffix-strip to scan for the
30/// matching closing `"` of the `msg="..."` field instead of a
31/// fixed-string suffix so we tolerate any trailing logfmt fields k6
32/// might add.
33fn extract_mockforge_marker_json(line: &str, marker: &str) -> Option<String> {
34    let start = line.find(marker)?;
35    let json_start = start + marker.len();
36    let rest = &line[json_start..];
37
38    // Is this the logfmt-wrapped form? The `msg="` opener sits 5 bytes
39    // before the marker. (Plain `msg=MOCKFORGE_...` would also be valid
40    // logfmt for a value with no spaces, but k6 always quote-wraps.)
41    let is_logfmt = start >= 5 && line.as_bytes().get(start - 5..start) == Some(b"msg=\"");
42    if is_logfmt {
43        // Walk forward until the unescaped closing `"` of msg=. Inside
44        // the field, `\\` is one escaped backslash and `\"` is one
45        // escaped quote — those bytes belong to the JSON content. Any
46        // unescaped `"` is the field terminator.
47        let bytes = rest.as_bytes();
48        let mut i = 0;
49        let mut out = String::with_capacity(rest.len());
50        while i < bytes.len() {
51            let b = bytes[i];
52            if b == b'"' {
53                // End of msg= field.
54                return Some(out);
55            }
56            if b == b'\\' && i + 1 < bytes.len() {
57                let next = bytes[i + 1];
58                match next {
59                    b'"' => out.push('"'),
60                    b'\\' => out.push('\\'),
61                    // Other escapes (`\n`, `\r`, `\t`, `\uXXXX`) are
62                    // PART of the JSON content — keep them verbatim so
63                    // serde_json::from_str interprets them.
64                    other => {
65                        out.push('\\');
66                        out.push(other as char);
67                    }
68                }
69                i += 2;
70                continue;
71            }
72            // Non-ASCII multi-byte UTF-8 codepoint or plain ASCII char.
73            // `rest` is a `&str` so we can rely on UTF-8 boundaries.
74            let ch_start = i;
75            // Advance i past the codepoint.
76            i += 1;
77            while i < bytes.len() && (bytes[i] & 0b1100_0000) == 0b1000_0000 {
78                i += 1;
79            }
80            out.push_str(&rest[ch_start..i]);
81        }
82        // Reached EOL without a closing quote — return what we have so
83        // the downstream parser can decide whether to keep it.
84        if out.is_empty() {
85            None
86        } else {
87            Some(out)
88        }
89    } else {
90        // Raw form: rest of the line is the JSON, possibly with trailing
91        // whitespace. No escape processing needed.
92        let trimmed = rest.trim();
93        if trimmed.is_empty() {
94            None
95        } else {
96            Some(trimmed.to_string())
97        }
98    }
99}
100
101/// Extract `MOCKFORGE_EXCHANGE:` JSON payload from a k6 output line (--export-requests).
102fn extract_exchange_json(line: &str) -> Option<String> {
103    extract_mockforge_marker_json(line, "MOCKFORGE_EXCHANGE:")
104}
105
106/// Extract `MOCKFORGE_FAILURE:` JSON payload from a k6 output line.
107fn extract_failure_json(line: &str) -> Option<String> {
108    extract_mockforge_marker_json(line, "MOCKFORGE_FAILURE:")
109}
110
111/// Round 47 (#79) — extract `MOCKFORGE_NETWORK_EVENT:` JSON payload.
112/// Emitted by the k6 captureExchange when `res.status === 0`, capturing
113/// the wire-level failure with a classified `kind`.
114fn extract_network_event_json(line: &str) -> Option<String> {
115    extract_mockforge_marker_json(line, "MOCKFORGE_NETWORK_EVENT:")
116}
117
118/// k6 executor
119pub struct K6Executor {
120    k6_path: String,
121    /// Comma-joined IPs/ranges/CIDRs forwarded to `k6 run --local-ips`.
122    /// Empty → flag omitted. Populated by callers that pass through the
123    /// CLI's `--source-ip`; lets a VU make requests from one of several
124    /// bound interfaces (k6 supports this natively, contrary to my
125    /// round-22 warning).
126    local_ips: String,
127}
128
129impl K6Executor {
130    /// Create a new k6 executor
131    pub fn new() -> Result<Self> {
132        let k6_path = which::which("k6")
133            .map_err(|_| BenchError::K6NotFound)?
134            .to_string_lossy()
135            .to_string();
136
137        Ok(Self {
138            k6_path,
139            local_ips: String::new(),
140        })
141    }
142
143    /// Set the `--local-ips` value for subsequent k6 invocations.
144    /// Accepts a comma-joined list of IPs, ranges (`10.0.0.1-10.0.0.5`),
145    /// and/or CIDRs (`192.168.0.0/24`) - same syntax k6 expects.
146    pub fn with_local_ips(mut self, local_ips: impl Into<String>) -> Self {
147        self.local_ips = local_ips.into();
148        self
149    }
150
151    /// Check if k6 is installed
152    pub fn is_k6_installed() -> bool {
153        which::which("k6").is_ok()
154    }
155
156    /// Get k6 version
157    pub async fn get_version(&self) -> Result<String> {
158        let output = TokioCommand::new(&self.k6_path)
159            .arg("version")
160            .output()
161            .await
162            .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
163
164        if !output.status.success() {
165            return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
166        }
167
168        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
169    }
170
171    /// Execute a k6 script.
172    ///
173    /// `api_port` — when set, overrides k6's default API server address (`localhost:6565`)
174    /// to `localhost:<api_port>`. This prevents "address already in use" errors when
175    /// running multiple k6 instances in parallel (e.g., multi-target bench).
176    /// Pass `None` for single-target runs (uses k6's default).
177    pub async fn execute(
178        &self,
179        script_path: &Path,
180        output_dir: Option<&Path>,
181        verbose: bool,
182    ) -> Result<K6Results> {
183        self.execute_with_port(script_path, output_dir, verbose, None).await
184    }
185
186    /// Execute a k6 script with an optional custom API server port.
187    pub async fn execute_with_port(
188        &self,
189        script_path: &Path,
190        output_dir: Option<&Path>,
191        verbose: bool,
192        api_port: Option<u16>,
193    ) -> Result<K6Results> {
194        println!("Starting load test...\n");
195
196        let mut cmd = TokioCommand::new(&self.k6_path);
197        cmd.arg("run");
198
199        // When running multiple k6 instances in parallel, each needs its own API server port
200        // to avoid "bind: address already in use" on the default port 6565.
201        if let Some(port) = api_port {
202            cmd.arg("--address").arg(format!("localhost:{}", port));
203        }
204
205        // `--local-ips` rotates each VU through a pool of source IPs that
206        // must already be bound on the host (CIDRs/ranges accepted). This
207        // gives the k6 path the same source-IP coverage as the native
208        // self-test driver's `--source-ip`.
209        if !self.local_ips.is_empty() {
210            cmd.arg("--local-ips").arg(&self.local_ips);
211        }
212
213        // summary.json is written by the k6 script's handleSummary() function
214        // (relative to CWD, set to output_dir below). We no longer use
215        // --summary-export as it's deprecated in newer k6 versions and
216        // conflicts with handleSummary when both try to write the same file.
217
218        // Add verbosity
219        if verbose {
220            cmd.arg("--verbose");
221        }
222
223        // Use absolute path for the script so it's found regardless of CWD.
224        let abs_script =
225            std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
226        cmd.arg(&abs_script);
227
228        // Set working directory to output dir so handleSummary's relative
229        // "summary.json" path lands next to the script.
230        if let Some(dir) = output_dir {
231            cmd.current_dir(dir);
232        }
233
234        cmd.stdout(Stdio::piped());
235        cmd.stderr(Stdio::piped());
236
237        let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
238
239        let stdout = child
240            .stdout
241            .take()
242            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
243
244        let stderr = child
245            .stderr
246            .take()
247            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
248
249        // Stream output
250        let stdout_reader = BufReader::new(stdout);
251        let stderr_reader = BufReader::new(stderr);
252
253        let mut stdout_lines = stdout_reader.lines();
254        let mut stderr_lines = stderr_reader.lines();
255
256        // Create progress indicator
257        let spinner = ProgressBar::new_spinner();
258        spinner.set_style(
259            ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
260        );
261        spinner.set_message("Running load test...");
262
263        // Collect failure details from k6's console.log output
264        // k6 may emit console.log to either stdout or stderr depending on version/config
265        let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
266            Arc::new(tokio::sync::Mutex::new(Vec::new()));
267        let fd_stdout = Arc::clone(&failure_details);
268        let fd_stderr = Arc::clone(&failure_details);
269
270        // Collect request/response exchanges for --export-requests
271        let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
272            Arc::new(tokio::sync::Mutex::new(Vec::new()));
273        let ex_stdout = Arc::clone(&exchange_details);
274        let ex_stderr = Arc::clone(&exchange_details);
275
276        // Round 47 (#79) — collect wire-level network events the
277        // k6 script emits on status=0 (connect / tls / timeout). Same
278        // shape as the native + self-test sinks so we can write a
279        // unified `conformance-network-events.json`.
280        let network_events: Arc<tokio::sync::Mutex<Vec<String>>> =
281            Arc::new(tokio::sync::Mutex::new(Vec::new()));
282        let ne_stdout = Arc::clone(&network_events);
283        let ne_stderr = Arc::clone(&network_events);
284
285        // Collect all k6 output for saving to a log file
286        let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
287            Arc::new(tokio::sync::Mutex::new(Vec::new()));
288        let log_stdout = Arc::clone(&log_lines);
289        let log_stderr = Arc::clone(&log_lines);
290
291        // Read stdout lines, capturing MOCKFORGE_FAILURE / MOCKFORGE_EXCHANGE / MOCKFORGE_NETWORK_EVENT markers
292        let stdout_handle = tokio::spawn(async move {
293            while let Ok(Some(line)) = stdout_lines.next_line().await {
294                log_stdout.lock().await.push(format!("[stdout] {}", line));
295                if let Some(json_str) = extract_failure_json(&line) {
296                    fd_stdout.lock().await.push(json_str);
297                } else if let Some(json_str) = extract_exchange_json(&line) {
298                    ex_stdout.lock().await.push(json_str);
299                } else if let Some(json_str) = extract_network_event_json(&line) {
300                    ne_stdout.lock().await.push(json_str);
301                } else {
302                    spinner.set_message(line.clone());
303                    if !line.is_empty() && !line.contains("running") && !line.contains("default") {
304                        println!("{}", line);
305                    }
306                }
307            }
308            spinner.finish_and_clear();
309        });
310
311        // Read stderr lines, capturing MOCKFORGE_FAILURE / MOCKFORGE_EXCHANGE / MOCKFORGE_NETWORK_EVENT markers
312        let stderr_handle = tokio::spawn(async move {
313            while let Ok(Some(line)) = stderr_lines.next_line().await {
314                if !line.is_empty() {
315                    log_stderr.lock().await.push(format!("[stderr] {}", line));
316                    if let Some(json_str) = extract_failure_json(&line) {
317                        fd_stderr.lock().await.push(json_str);
318                    } else if let Some(json_str) = extract_exchange_json(&line) {
319                        ex_stderr.lock().await.push(json_str);
320                    } else if let Some(json_str) = extract_network_event_json(&line) {
321                        ne_stderr.lock().await.push(json_str);
322                    } else {
323                        eprintln!("{}", line);
324                    }
325                }
326            }
327        });
328
329        // Wait for completion
330        let status =
331            child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
332
333        // Wait for both reader tasks to finish processing all lines
334        let _ = stdout_handle.await;
335        let _ = stderr_handle.await;
336
337        // k6 exit code 99 = thresholds crossed. The test DID run and summary.json
338        // should still be present. Only treat non-99 failures as hard errors.
339        let exit_code = status.code().unwrap_or(-1);
340        if !status.success() && exit_code != 99 {
341            return Err(BenchError::K6ExecutionFailed(format!(
342                "k6 exited with status: {}",
343                status
344            )));
345        }
346        if exit_code == 99 {
347            tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
348        }
349
350        // Write failure details to file if any were captured
351        if let Some(dir) = output_dir {
352            let details = failure_details.lock().await;
353            if !details.is_empty() {
354                let failure_path = dir.join("conformance-failure-details.json");
355                let parsed: Vec<serde_json::Value> =
356                    details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
357                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
358                    let _ = std::fs::write(&failure_path, json);
359                }
360            }
361
362            // Write exchange details (--export-requests) if any were captured
363            let exchanges = exchange_details.lock().await;
364            if !exchanges.is_empty() {
365                let exchange_path = dir.join("conformance-requests.json");
366                let parsed: Vec<serde_json::Value> =
367                    exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
368                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
369                    let _ = std::fs::write(&exchange_path, json);
370                    tracing::info!(
371                        "Exported {} request/response pairs to {}",
372                        parsed.len(),
373                        exchange_path.display()
374                    );
375                }
376            }
377
378            // Round 47 (#79) — write the wire-level events sink. We
379            // ALWAYS write the file (empty array when nothing failed)
380            // so a caller can tell "everything succeeded" from "nobody
381            // looked" at a glance.
382            let net_events = network_events.lock().await;
383            let net_path = dir.join("conformance-network-events.json");
384            let parsed: Vec<serde_json::Value> =
385                net_events.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
386            if let Ok(json) = serde_json::to_string_pretty(&parsed) {
387                let _ = std::fs::write(&net_path, json);
388                if !parsed.is_empty() {
389                    tracing::warn!(
390                        "Recorded {} wire-level network event(s) to {}",
391                        parsed.len(),
392                        net_path.display()
393                    );
394                }
395            }
396
397            // Save full k6 output to a log file for debugging
398            let lines = log_lines.lock().await;
399            if !lines.is_empty() {
400                let log_path = dir.join("k6-output.log");
401                let _ = std::fs::write(&log_path, lines.join("\n"));
402                println!("k6 output log saved to: {}", log_path.display());
403            }
404        }
405
406        // Parse results if output directory was specified
407        let results = if let Some(dir) = output_dir {
408            Self::parse_results(dir)?
409        } else {
410            K6Results::default()
411        };
412
413        Ok(results)
414    }
415
416    /// Parse k6 results from JSON output
417    fn parse_results(output_dir: &Path) -> Result<K6Results> {
418        let summary_path = output_dir.join("summary.json");
419
420        if !summary_path.exists() {
421            return Ok(K6Results::default());
422        }
423
424        let content = std::fs::read_to_string(summary_path)
425            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
426
427        let json: serde_json::Value = serde_json::from_str(&content)
428            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
429
430        let duration_values = &json["metrics"]["http_req_duration"]["values"];
431
432        let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
433        let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
434        let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
435
436        // Issue #79 (round 5) — surface TCP connect / TLS handshake stats and
437        // a connection-rate count for `--cps` runs.
438        //
439        // Round 6 follow-up: k6's `http_req_connecting` Trend doesn't expose a
440        // `count` field in summary.json (only avg/min/med/max/p90/p95), so we
441        // can't use it for "connections opened". The template now feeds a
442        // dedicated Counter, `mockforge_connections_opened`, every time a
443        // request's `res.timings.connecting > 0`. That gives us an accurate
444        // count for both `--cps` (≈ total_requests) and pooled-reuse (≈ vus_max)
445        // runs. The Trend is still useful for the avg/max timing display.
446        let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
447        let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
448        let mf_conns_opened = &json["metrics"]["mockforge_connections_opened"]["values"]["count"];
449
450        Ok(K6Results {
451            total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
452            // k6 Rate metric: `passes` = count of non-zero values.
453            // For http_req_failed, non-zero means the request failed.
454            // So `passes` = failed request count, `fails` = successful request count.
455            failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
456                .as_u64()
457                .unwrap_or(0),
458            avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
459            p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
460            p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
461            rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
462            vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
463            min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
464            max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
465            med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
466            p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
467            server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
468            server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
469            server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
470            server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
471            server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
472            server_reported_faults: server_fault.as_u64().unwrap_or(0),
473            // Counter from the template, not the Trend's count (which is
474            // absent in k6 summary JSON).
475            tcp_connect_samples: mf_conns_opened.as_u64().unwrap_or(0),
476            tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
477            tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
478            // TLS handshake Trend has no `count` either; gate display on avg>0.
479            tls_handshake_samples: if tls_handshake["avg"].as_f64().unwrap_or(0.0) > 0.0 {
480                // Use connection count as a proxy — every new TLS session
481                // requires a handshake.
482                mf_conns_opened.as_u64().unwrap_or(0)
483            } else {
484                0
485            },
486            tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
487            tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
488            iterations_completed: json["metrics"]["iterations"]["values"]["count"]
489                .as_u64()
490                .unwrap_or(0),
491        })
492    }
493}
494
495impl Default for K6Executor {
496    fn default() -> Self {
497        Self::new().expect("k6 not found")
498    }
499}
500
501/// k6 test results
502#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
503pub struct K6Results {
504    pub total_requests: u64,
505    pub failed_requests: u64,
506    pub avg_duration_ms: f64,
507    pub p95_duration_ms: f64,
508    pub p99_duration_ms: f64,
509    pub rps: f64,
510    pub vus_max: u32,
511    pub min_duration_ms: f64,
512    pub max_duration_ms: f64,
513    pub med_duration_ms: f64,
514    pub p90_duration_ms: f64,
515    /// Issue #79 — client-side visibility into MockForge-injected latency,
516    /// parsed from the `X-Mockforge-Injected-Latency-Ms` response header that
517    /// the chaos middleware sets. Zero when chaos isn't firing or the target
518    /// isn't MockForge.
519    #[serde(default)]
520    pub server_injected_latency_samples: u64,
521    #[serde(default)]
522    pub server_injected_latency_avg_ms: f64,
523    #[serde(default)]
524    pub server_injected_latency_max_ms: f64,
525    #[serde(default)]
526    pub server_injected_jitter_samples: u64,
527    #[serde(default)]
528    pub server_injected_jitter_avg_ms: f64,
529    /// Count of responses that carried an `X-Mockforge-Fault` header.
530    #[serde(default)]
531    pub server_reported_faults: u64,
532    /// Issue #79 (round 5) — TCP connect samples / timing. With `--cps`
533    /// (`noConnectionReuse: true`) k6 records one connect per request, so
534    /// `tcp_connect_samples` equals connections opened. Without `--cps` this
535    /// is typically a small count (k6 reuses pooled connections), so it tells
536    /// you whether reuse was actually happening.
537    #[serde(default)]
538    pub tcp_connect_samples: u64,
539    #[serde(default)]
540    pub tcp_connect_avg_ms: f64,
541    #[serde(default)]
542    pub tcp_connect_max_ms: f64,
543    /// TLS handshake samples / timing — same shape as TCP connect, but only
544    /// non-zero for HTTPS targets.
545    #[serde(default)]
546    pub tls_handshake_samples: u64,
547    #[serde(default)]
548    pub tls_handshake_avg_ms: f64,
549    #[serde(default)]
550    pub tls_handshake_max_ms: f64,
551    /// Issue #79 round 10 — k6 iteration counter from `iterations.values.count`.
552    /// For `constant-arrival-rate` (`--rps`), this is the number of full
553    /// iterations completed within the duration. When `iterations × num_ops`
554    /// is much less than `total_requests`, mid-iteration cancellation truncated
555    /// the run and not every operation in the spec was exercised.
556    #[serde(default)]
557    pub iterations_completed: u64,
558}
559
560impl K6Results {
561    /// Get error rate as a percentage
562    pub fn error_rate(&self) -> f64 {
563        if self.total_requests == 0 {
564            return 0.0;
565        }
566        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
567    }
568
569    /// Get success rate as a percentage
570    pub fn success_rate(&self) -> f64 {
571        100.0 - self.error_rate()
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578
579    #[test]
580    fn test_k6_results_error_rate() {
581        let results = K6Results {
582            total_requests: 100,
583            failed_requests: 5,
584            avg_duration_ms: 100.0,
585            p95_duration_ms: 200.0,
586            p99_duration_ms: 300.0,
587            ..Default::default()
588        };
589
590        assert_eq!(results.error_rate(), 5.0);
591        assert_eq!(results.success_rate(), 95.0);
592    }
593
594    #[test]
595    fn test_k6_results_zero_requests() {
596        let results = K6Results::default();
597        assert_eq!(results.error_rate(), 0.0);
598    }
599
600    #[test]
601    fn test_extract_failure_json_raw() {
602        let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
603        let result = extract_failure_json(line).unwrap();
604        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
605        assert_eq!(parsed["check"], "test");
606    }
607
608    #[test]
609    fn test_extract_failure_json_logfmt() {
610        let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
611        let result = extract_failure_json(line).unwrap();
612        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
613        assert_eq!(parsed["check"], "test");
614        assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
615    }
616
617    #[test]
618    fn test_extract_failure_json_no_marker() {
619        assert!(extract_failure_json("just a regular log line").is_none());
620    }
621
622    /// Round 46 (#79) — regression: Srikanth's multipart upload landed
623    /// `[]` in `conformance-requests.json` because the old
624    /// `replace("\\\\","\\").replace("\\\"","\"")` chain misparsed
625    /// adjacent backslashes inside the JSON body (binary multipart
626    /// bytes encoded as `\\u00XX` etc.). Pin both shapes here.
627    #[test]
628    fn test_extract_exchange_logfmt_with_backslash_escapes() {
629        // A JSON body that contains a JSON-encoded `` (one escape
630        // sequence the validator survives). Logfmt wraps it: each `\`
631        // becomes `\\`, each `"` becomes `\"`.
632        let line = r#"time="2026-06-26T10:00:00Z" level=info msg="MOCKFORGE_EXCHANGE:{\"check\":\"u\",\"request\":{\"body\":\"--bnd\\r\\n\\u001a\"}}" source=console"#;
633        let result = extract_exchange_json(line).unwrap();
634        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
635        assert_eq!(parsed["check"], "u");
636        // The unescape preserves the JSON's `\r\n` and `` so the
637        // downstream consumer can interpret them as JSON escapes.
638        assert_eq!(parsed["request"]["body"], "--bnd\r\n\u{001a}");
639    }
640
641    #[test]
642    fn test_extract_exchange_raw_no_logfmt_wrapping() {
643        let line =
644            r#"MOCKFORGE_EXCHANGE:{"check":"x","request":{"body":""},"response":{"status":200}}"#;
645        let result = extract_exchange_json(line).unwrap();
646        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
647        assert_eq!(parsed["check"], "x");
648        assert_eq!(parsed["response"]["status"], 200);
649    }
650
651    /// The end of `msg="..."` is a single unescaped `"`, not the old
652    /// fixed-string `" source=console`. If k6 ever appends another
653    /// logfmt field (or omits source=), we still get the JSON out.
654    #[test]
655    fn test_extract_exchange_logfmt_tolerates_extra_trailing_fields() {
656        let line = r#"msg="MOCKFORGE_EXCHANGE:{\"check\":\"t\"}" source=console vu=1 iter=0"#;
657        let result = extract_exchange_json(line).unwrap();
658        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
659        assert_eq!(parsed["check"], "t");
660    }
661
662    /// Round 46 — JSON-encoded backslash inside a JSON string (`\\u00XX`
663    /// in the JSON, `\\\\u00XX` in logfmt) must round-trip cleanly.
664    /// The naive `.replace` chain choked on this exact pattern.
665    #[test]
666    fn test_extract_exchange_double_backslash_followed_by_quote() {
667        // JSON content: `\\"x"` is `\` then `"x"`. Logfmt:
668        // `\\\\\"x\"` (4 backslashes + escaped quote + x + escaped quote).
669        let line = r#"msg="MOCKFORGE_EXCHANGE:{\"k\":\"a\\\\\\\"x\\\"\"}" source=console"#;
670        let result = extract_exchange_json(line).unwrap();
671        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
672        assert_eq!(parsed["k"], r#"a\"x""#);
673    }
674}