Skip to main content

mockforge_bench/
executor.rs

1//! k6 execution and output handling
2
3use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11/// Extract a `MOCKFORGE_<KIND>:` JSON payload from a k6 output line.
12///
13/// k6 emits these via `console.log`, which goes through one of two paths
14/// depending on the runner config:
15/// - **Raw**: `MOCKFORGE_EXCHANGE:{"check":"...", ...}` straight to stdout.
16/// - **Logfmt**: `time="..." level=info msg="MOCKFORGE_EXCHANGE:{...}" source=console`
17///   where the JSON's `"` are escaped as `\"` and `\` as `\\` so it fits
18///   inside the `msg="..."` field.
19///
20/// Round 46 (#79) — Srikanth on 0.3.190: a multipart upload landed `[]`
21/// in `conformance-requests.json` even though `MOCKFORGE_EXCHANGE:` was
22/// present in the k6 log. Root cause: the previous parser used a naive
23/// `replace("\\\\", "\\").replace("\\\"", "\"")` chain. With binary
24/// multipart bytes the JSON content includes sequences like `\\"`
25/// (literal backslash followed by literal quote inside a JSON string),
26/// which logfmt-escapes to `\\\\\"`. The replace chain processed `\\`
27/// → `\` first, leaving `\\\"`, then `\"` → `"`, mangling the JSON.
28/// Replaced with a single character walk that consumes one logfmt
29/// escape at a time. Also rewrote the suffix-strip to scan for the
30/// matching closing `"` of the `msg="..."` field instead of a
31/// fixed-string suffix so we tolerate any trailing logfmt fields k6
32/// might add.
33fn extract_mockforge_marker_json(line: &str, marker: &str) -> Option<String> {
34    let start = line.find(marker)?;
35    let json_start = start + marker.len();
36    let rest = &line[json_start..];
37
38    // Is this the logfmt-wrapped form? The `msg="` opener sits 5 bytes
39    // before the marker. (Plain `msg=MOCKFORGE_...` would also be valid
40    // logfmt for a value with no spaces, but k6 always quote-wraps.)
41    let is_logfmt = start >= 5 && line.as_bytes().get(start - 5..start) == Some(b"msg=\"");
42    if is_logfmt {
43        // Walk forward until the unescaped closing `"` of msg=. Inside
44        // the field, `\\` is one escaped backslash and `\"` is one
45        // escaped quote — those bytes belong to the JSON content. Any
46        // unescaped `"` is the field terminator.
47        let bytes = rest.as_bytes();
48        let mut i = 0;
49        let mut out = String::with_capacity(rest.len());
50        while i < bytes.len() {
51            let b = bytes[i];
52            if b == b'"' {
53                // End of msg= field.
54                return Some(out);
55            }
56            if b == b'\\' && i + 1 < bytes.len() {
57                let next = bytes[i + 1];
58                match next {
59                    b'"' => out.push('"'),
60                    b'\\' => out.push('\\'),
61                    // Other escapes (`\n`, `\r`, `\t`, `\uXXXX`) are
62                    // PART of the JSON content — keep them verbatim so
63                    // serde_json::from_str interprets them.
64                    other => {
65                        out.push('\\');
66                        out.push(other as char);
67                    }
68                }
69                i += 2;
70                continue;
71            }
72            // Non-ASCII multi-byte UTF-8 codepoint or plain ASCII char.
73            // `rest` is a `&str` so we can rely on UTF-8 boundaries.
74            let ch_start = i;
75            // Advance i past the codepoint.
76            i += 1;
77            while i < bytes.len() && (bytes[i] & 0b1100_0000) == 0b1000_0000 {
78                i += 1;
79            }
80            out.push_str(&rest[ch_start..i]);
81        }
82        // Reached EOL without a closing quote — return what we have so
83        // the downstream parser can decide whether to keep it.
84        if out.is_empty() {
85            None
86        } else {
87            Some(out)
88        }
89    } else {
90        // Raw form: rest of the line is the JSON, possibly with trailing
91        // whitespace. No escape processing needed.
92        let trimmed = rest.trim();
93        if trimmed.is_empty() {
94            None
95        } else {
96            Some(trimmed.to_string())
97        }
98    }
99}
100
101/// Extract `MOCKFORGE_EXCHANGE:` JSON payload from a k6 output line (--export-requests).
102fn extract_exchange_json(line: &str) -> Option<String> {
103    extract_mockforge_marker_json(line, "MOCKFORGE_EXCHANGE:")
104}
105
106/// Extract `MOCKFORGE_FAILURE:` JSON payload from a k6 output line.
107fn extract_failure_json(line: &str) -> Option<String> {
108    extract_mockforge_marker_json(line, "MOCKFORGE_FAILURE:")
109}
110
111/// k6 executor
112pub struct K6Executor {
113    k6_path: String,
114    /// Comma-joined IPs/ranges/CIDRs forwarded to `k6 run --local-ips`.
115    /// Empty → flag omitted. Populated by callers that pass through the
116    /// CLI's `--source-ip`; lets a VU make requests from one of several
117    /// bound interfaces (k6 supports this natively, contrary to my
118    /// round-22 warning).
119    local_ips: String,
120}
121
122impl K6Executor {
123    /// Create a new k6 executor
124    pub fn new() -> Result<Self> {
125        let k6_path = which::which("k6")
126            .map_err(|_| BenchError::K6NotFound)?
127            .to_string_lossy()
128            .to_string();
129
130        Ok(Self {
131            k6_path,
132            local_ips: String::new(),
133        })
134    }
135
136    /// Set the `--local-ips` value for subsequent k6 invocations.
137    /// Accepts a comma-joined list of IPs, ranges (`10.0.0.1-10.0.0.5`),
138    /// and/or CIDRs (`192.168.0.0/24`) - same syntax k6 expects.
139    pub fn with_local_ips(mut self, local_ips: impl Into<String>) -> Self {
140        self.local_ips = local_ips.into();
141        self
142    }
143
144    /// Check if k6 is installed
145    pub fn is_k6_installed() -> bool {
146        which::which("k6").is_ok()
147    }
148
149    /// Get k6 version
150    pub async fn get_version(&self) -> Result<String> {
151        let output = TokioCommand::new(&self.k6_path)
152            .arg("version")
153            .output()
154            .await
155            .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
156
157        if !output.status.success() {
158            return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
159        }
160
161        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
162    }
163
164    /// Execute a k6 script.
165    ///
166    /// `api_port` — when set, overrides k6's default API server address (`localhost:6565`)
167    /// to `localhost:<api_port>`. This prevents "address already in use" errors when
168    /// running multiple k6 instances in parallel (e.g., multi-target bench).
169    /// Pass `None` for single-target runs (uses k6's default).
170    pub async fn execute(
171        &self,
172        script_path: &Path,
173        output_dir: Option<&Path>,
174        verbose: bool,
175    ) -> Result<K6Results> {
176        self.execute_with_port(script_path, output_dir, verbose, None).await
177    }
178
179    /// Execute a k6 script with an optional custom API server port.
180    pub async fn execute_with_port(
181        &self,
182        script_path: &Path,
183        output_dir: Option<&Path>,
184        verbose: bool,
185        api_port: Option<u16>,
186    ) -> Result<K6Results> {
187        println!("Starting load test...\n");
188
189        let mut cmd = TokioCommand::new(&self.k6_path);
190        cmd.arg("run");
191
192        // When running multiple k6 instances in parallel, each needs its own API server port
193        // to avoid "bind: address already in use" on the default port 6565.
194        if let Some(port) = api_port {
195            cmd.arg("--address").arg(format!("localhost:{}", port));
196        }
197
198        // `--local-ips` rotates each VU through a pool of source IPs that
199        // must already be bound on the host (CIDRs/ranges accepted). This
200        // gives the k6 path the same source-IP coverage as the native
201        // self-test driver's `--source-ip`.
202        if !self.local_ips.is_empty() {
203            cmd.arg("--local-ips").arg(&self.local_ips);
204        }
205
206        // summary.json is written by the k6 script's handleSummary() function
207        // (relative to CWD, set to output_dir below). We no longer use
208        // --summary-export as it's deprecated in newer k6 versions and
209        // conflicts with handleSummary when both try to write the same file.
210
211        // Add verbosity
212        if verbose {
213            cmd.arg("--verbose");
214        }
215
216        // Use absolute path for the script so it's found regardless of CWD.
217        let abs_script =
218            std::fs::canonicalize(script_path).unwrap_or_else(|_| script_path.to_path_buf());
219        cmd.arg(&abs_script);
220
221        // Set working directory to output dir so handleSummary's relative
222        // "summary.json" path lands next to the script.
223        if let Some(dir) = output_dir {
224            cmd.current_dir(dir);
225        }
226
227        cmd.stdout(Stdio::piped());
228        cmd.stderr(Stdio::piped());
229
230        let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
231
232        let stdout = child
233            .stdout
234            .take()
235            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
236
237        let stderr = child
238            .stderr
239            .take()
240            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
241
242        // Stream output
243        let stdout_reader = BufReader::new(stdout);
244        let stderr_reader = BufReader::new(stderr);
245
246        let mut stdout_lines = stdout_reader.lines();
247        let mut stderr_lines = stderr_reader.lines();
248
249        // Create progress indicator
250        let spinner = ProgressBar::new_spinner();
251        spinner.set_style(
252            ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
253        );
254        spinner.set_message("Running load test...");
255
256        // Collect failure details from k6's console.log output
257        // k6 may emit console.log to either stdout or stderr depending on version/config
258        let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
259            Arc::new(tokio::sync::Mutex::new(Vec::new()));
260        let fd_stdout = Arc::clone(&failure_details);
261        let fd_stderr = Arc::clone(&failure_details);
262
263        // Collect request/response exchanges for --export-requests
264        let exchange_details: Arc<tokio::sync::Mutex<Vec<String>>> =
265            Arc::new(tokio::sync::Mutex::new(Vec::new()));
266        let ex_stdout = Arc::clone(&exchange_details);
267        let ex_stderr = Arc::clone(&exchange_details);
268
269        // Collect all k6 output for saving to a log file
270        let log_lines: Arc<tokio::sync::Mutex<Vec<String>>> =
271            Arc::new(tokio::sync::Mutex::new(Vec::new()));
272        let log_stdout = Arc::clone(&log_lines);
273        let log_stderr = Arc::clone(&log_lines);
274
275        // Read stdout lines, capturing MOCKFORGE_FAILURE and MOCKFORGE_EXCHANGE markers
276        let stdout_handle = tokio::spawn(async move {
277            while let Ok(Some(line)) = stdout_lines.next_line().await {
278                log_stdout.lock().await.push(format!("[stdout] {}", line));
279                if let Some(json_str) = extract_failure_json(&line) {
280                    fd_stdout.lock().await.push(json_str);
281                } else if let Some(json_str) = extract_exchange_json(&line) {
282                    ex_stdout.lock().await.push(json_str);
283                } else {
284                    spinner.set_message(line.clone());
285                    if !line.is_empty() && !line.contains("running") && !line.contains("default") {
286                        println!("{}", line);
287                    }
288                }
289            }
290            spinner.finish_and_clear();
291        });
292
293        // Read stderr lines, capturing MOCKFORGE_FAILURE and MOCKFORGE_EXCHANGE markers
294        let stderr_handle = tokio::spawn(async move {
295            while let Ok(Some(line)) = stderr_lines.next_line().await {
296                if !line.is_empty() {
297                    log_stderr.lock().await.push(format!("[stderr] {}", line));
298                    if let Some(json_str) = extract_failure_json(&line) {
299                        fd_stderr.lock().await.push(json_str);
300                    } else if let Some(json_str) = extract_exchange_json(&line) {
301                        ex_stderr.lock().await.push(json_str);
302                    } else {
303                        eprintln!("{}", line);
304                    }
305                }
306            }
307        });
308
309        // Wait for completion
310        let status =
311            child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
312
313        // Wait for both reader tasks to finish processing all lines
314        let _ = stdout_handle.await;
315        let _ = stderr_handle.await;
316
317        // k6 exit code 99 = thresholds crossed. The test DID run and summary.json
318        // should still be present. Only treat non-99 failures as hard errors.
319        let exit_code = status.code().unwrap_or(-1);
320        if !status.success() && exit_code != 99 {
321            return Err(BenchError::K6ExecutionFailed(format!(
322                "k6 exited with status: {}",
323                status
324            )));
325        }
326        if exit_code == 99 {
327            tracing::warn!("k6 thresholds crossed (exit code 99) — results will still be parsed");
328        }
329
330        // Write failure details to file if any were captured
331        if let Some(dir) = output_dir {
332            let details = failure_details.lock().await;
333            if !details.is_empty() {
334                let failure_path = dir.join("conformance-failure-details.json");
335                let parsed: Vec<serde_json::Value> =
336                    details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
337                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
338                    let _ = std::fs::write(&failure_path, json);
339                }
340            }
341
342            // Write exchange details (--export-requests) if any were captured
343            let exchanges = exchange_details.lock().await;
344            if !exchanges.is_empty() {
345                let exchange_path = dir.join("conformance-requests.json");
346                let parsed: Vec<serde_json::Value> =
347                    exchanges.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
348                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
349                    let _ = std::fs::write(&exchange_path, json);
350                    tracing::info!(
351                        "Exported {} request/response pairs to {}",
352                        parsed.len(),
353                        exchange_path.display()
354                    );
355                }
356            }
357
358            // Save full k6 output to a log file for debugging
359            let lines = log_lines.lock().await;
360            if !lines.is_empty() {
361                let log_path = dir.join("k6-output.log");
362                let _ = std::fs::write(&log_path, lines.join("\n"));
363                println!("k6 output log saved to: {}", log_path.display());
364            }
365        }
366
367        // Parse results if output directory was specified
368        let results = if let Some(dir) = output_dir {
369            Self::parse_results(dir)?
370        } else {
371            K6Results::default()
372        };
373
374        Ok(results)
375    }
376
377    /// Parse k6 results from JSON output
378    fn parse_results(output_dir: &Path) -> Result<K6Results> {
379        let summary_path = output_dir.join("summary.json");
380
381        if !summary_path.exists() {
382            return Ok(K6Results::default());
383        }
384
385        let content = std::fs::read_to_string(summary_path)
386            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
387
388        let json: serde_json::Value = serde_json::from_str(&content)
389            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
390
391        let duration_values = &json["metrics"]["http_req_duration"]["values"];
392
393        let server_latency = &json["metrics"]["mockforge_server_injected_latency_ms"]["values"];
394        let server_jitter = &json["metrics"]["mockforge_server_injected_jitter_ms"]["values"];
395        let server_fault = &json["metrics"]["mockforge_server_fault_total"]["values"]["count"];
396
397        // Issue #79 (round 5) — surface TCP connect / TLS handshake stats and
398        // a connection-rate count for `--cps` runs.
399        //
400        // Round 6 follow-up: k6's `http_req_connecting` Trend doesn't expose a
401        // `count` field in summary.json (only avg/min/med/max/p90/p95), so we
402        // can't use it for "connections opened". The template now feeds a
403        // dedicated Counter, `mockforge_connections_opened`, every time a
404        // request's `res.timings.connecting > 0`. That gives us an accurate
405        // count for both `--cps` (≈ total_requests) and pooled-reuse (≈ vus_max)
406        // runs. The Trend is still useful for the avg/max timing display.
407        let tcp_connecting = &json["metrics"]["http_req_connecting"]["values"];
408        let tls_handshake = &json["metrics"]["http_req_tls_handshaking"]["values"];
409        let mf_conns_opened = &json["metrics"]["mockforge_connections_opened"]["values"]["count"];
410
411        Ok(K6Results {
412            total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
413            // k6 Rate metric: `passes` = count of non-zero values.
414            // For http_req_failed, non-zero means the request failed.
415            // So `passes` = failed request count, `fails` = successful request count.
416            failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
417                .as_u64()
418                .unwrap_or(0),
419            avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
420            p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
421            p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
422            rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
423            vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
424            min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
425            max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
426            med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
427            p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
428            server_injected_latency_samples: server_latency["count"].as_u64().unwrap_or(0),
429            server_injected_latency_avg_ms: server_latency["avg"].as_f64().unwrap_or(0.0),
430            server_injected_latency_max_ms: server_latency["max"].as_f64().unwrap_or(0.0),
431            server_injected_jitter_samples: server_jitter["count"].as_u64().unwrap_or(0),
432            server_injected_jitter_avg_ms: server_jitter["avg"].as_f64().unwrap_or(0.0),
433            server_reported_faults: server_fault.as_u64().unwrap_or(0),
434            // Counter from the template, not the Trend's count (which is
435            // absent in k6 summary JSON).
436            tcp_connect_samples: mf_conns_opened.as_u64().unwrap_or(0),
437            tcp_connect_avg_ms: tcp_connecting["avg"].as_f64().unwrap_or(0.0),
438            tcp_connect_max_ms: tcp_connecting["max"].as_f64().unwrap_or(0.0),
439            // TLS handshake Trend has no `count` either; gate display on avg>0.
440            tls_handshake_samples: if tls_handshake["avg"].as_f64().unwrap_or(0.0) > 0.0 {
441                // Use connection count as a proxy — every new TLS session
442                // requires a handshake.
443                mf_conns_opened.as_u64().unwrap_or(0)
444            } else {
445                0
446            },
447            tls_handshake_avg_ms: tls_handshake["avg"].as_f64().unwrap_or(0.0),
448            tls_handshake_max_ms: tls_handshake["max"].as_f64().unwrap_or(0.0),
449            iterations_completed: json["metrics"]["iterations"]["values"]["count"]
450                .as_u64()
451                .unwrap_or(0),
452        })
453    }
454}
455
456impl Default for K6Executor {
457    fn default() -> Self {
458        Self::new().expect("k6 not found")
459    }
460}
461
462/// k6 test results
463#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
464pub struct K6Results {
465    pub total_requests: u64,
466    pub failed_requests: u64,
467    pub avg_duration_ms: f64,
468    pub p95_duration_ms: f64,
469    pub p99_duration_ms: f64,
470    pub rps: f64,
471    pub vus_max: u32,
472    pub min_duration_ms: f64,
473    pub max_duration_ms: f64,
474    pub med_duration_ms: f64,
475    pub p90_duration_ms: f64,
476    /// Issue #79 — client-side visibility into MockForge-injected latency,
477    /// parsed from the `X-Mockforge-Injected-Latency-Ms` response header that
478    /// the chaos middleware sets. Zero when chaos isn't firing or the target
479    /// isn't MockForge.
480    #[serde(default)]
481    pub server_injected_latency_samples: u64,
482    #[serde(default)]
483    pub server_injected_latency_avg_ms: f64,
484    #[serde(default)]
485    pub server_injected_latency_max_ms: f64,
486    #[serde(default)]
487    pub server_injected_jitter_samples: u64,
488    #[serde(default)]
489    pub server_injected_jitter_avg_ms: f64,
490    /// Count of responses that carried an `X-Mockforge-Fault` header.
491    #[serde(default)]
492    pub server_reported_faults: u64,
493    /// Issue #79 (round 5) — TCP connect samples / timing. With `--cps`
494    /// (`noConnectionReuse: true`) k6 records one connect per request, so
495    /// `tcp_connect_samples` equals connections opened. Without `--cps` this
496    /// is typically a small count (k6 reuses pooled connections), so it tells
497    /// you whether reuse was actually happening.
498    #[serde(default)]
499    pub tcp_connect_samples: u64,
500    #[serde(default)]
501    pub tcp_connect_avg_ms: f64,
502    #[serde(default)]
503    pub tcp_connect_max_ms: f64,
504    /// TLS handshake samples / timing — same shape as TCP connect, but only
505    /// non-zero for HTTPS targets.
506    #[serde(default)]
507    pub tls_handshake_samples: u64,
508    #[serde(default)]
509    pub tls_handshake_avg_ms: f64,
510    #[serde(default)]
511    pub tls_handshake_max_ms: f64,
512    /// Issue #79 round 10 — k6 iteration counter from `iterations.values.count`.
513    /// For `constant-arrival-rate` (`--rps`), this is the number of full
514    /// iterations completed within the duration. When `iterations × num_ops`
515    /// is much less than `total_requests`, mid-iteration cancellation truncated
516    /// the run and not every operation in the spec was exercised.
517    #[serde(default)]
518    pub iterations_completed: u64,
519}
520
521impl K6Results {
522    /// Get error rate as a percentage
523    pub fn error_rate(&self) -> f64 {
524        if self.total_requests == 0 {
525            return 0.0;
526        }
527        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
528    }
529
530    /// Get success rate as a percentage
531    pub fn success_rate(&self) -> f64 {
532        100.0 - self.error_rate()
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[test]
541    fn test_k6_results_error_rate() {
542        let results = K6Results {
543            total_requests: 100,
544            failed_requests: 5,
545            avg_duration_ms: 100.0,
546            p95_duration_ms: 200.0,
547            p99_duration_ms: 300.0,
548            ..Default::default()
549        };
550
551        assert_eq!(results.error_rate(), 5.0);
552        assert_eq!(results.success_rate(), 95.0);
553    }
554
555    #[test]
556    fn test_k6_results_zero_requests() {
557        let results = K6Results::default();
558        assert_eq!(results.error_rate(), 0.0);
559    }
560
561    #[test]
562    fn test_extract_failure_json_raw() {
563        let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
564        let result = extract_failure_json(line).unwrap();
565        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
566        assert_eq!(parsed["check"], "test");
567    }
568
569    #[test]
570    fn test_extract_failure_json_logfmt() {
571        let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
572        let result = extract_failure_json(line).unwrap();
573        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
574        assert_eq!(parsed["check"], "test");
575        assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
576    }
577
578    #[test]
579    fn test_extract_failure_json_no_marker() {
580        assert!(extract_failure_json("just a regular log line").is_none());
581    }
582
583    /// Round 46 (#79) — regression: Srikanth's multipart upload landed
584    /// `[]` in `conformance-requests.json` because the old
585    /// `replace("\\\\","\\").replace("\\\"","\"")` chain misparsed
586    /// adjacent backslashes inside the JSON body (binary multipart
587    /// bytes encoded as `\\u00XX` etc.). Pin both shapes here.
588    #[test]
589    fn test_extract_exchange_logfmt_with_backslash_escapes() {
590        // A JSON body that contains a JSON-encoded `` (one escape
591        // sequence the validator survives). Logfmt wraps it: each `\`
592        // becomes `\\`, each `"` becomes `\"`.
593        let line = r#"time="2026-06-26T10:00:00Z" level=info msg="MOCKFORGE_EXCHANGE:{\"check\":\"u\",\"request\":{\"body\":\"--bnd\\r\\n\\u001a\"}}" source=console"#;
594        let result = extract_exchange_json(line).unwrap();
595        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
596        assert_eq!(parsed["check"], "u");
597        // The unescape preserves the JSON's `\r\n` and `` so the
598        // downstream consumer can interpret them as JSON escapes.
599        assert_eq!(parsed["request"]["body"], "--bnd\r\n\u{001a}");
600    }
601
602    #[test]
603    fn test_extract_exchange_raw_no_logfmt_wrapping() {
604        let line =
605            r#"MOCKFORGE_EXCHANGE:{"check":"x","request":{"body":""},"response":{"status":200}}"#;
606        let result = extract_exchange_json(line).unwrap();
607        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
608        assert_eq!(parsed["check"], "x");
609        assert_eq!(parsed["response"]["status"], 200);
610    }
611
612    /// The end of `msg="..."` is a single unescaped `"`, not the old
613    /// fixed-string `" source=console`. If k6 ever appends another
614    /// logfmt field (or omits source=), we still get the JSON out.
615    #[test]
616    fn test_extract_exchange_logfmt_tolerates_extra_trailing_fields() {
617        let line = r#"msg="MOCKFORGE_EXCHANGE:{\"check\":\"t\"}" source=console vu=1 iter=0"#;
618        let result = extract_exchange_json(line).unwrap();
619        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
620        assert_eq!(parsed["check"], "t");
621    }
622
623    /// Round 46 — JSON-encoded backslash inside a JSON string (`\\u00XX`
624    /// in the JSON, `\\\\u00XX` in logfmt) must round-trip cleanly.
625    /// The naive `.replace` chain choked on this exact pattern.
626    #[test]
627    fn test_extract_exchange_double_backslash_followed_by_quote() {
628        // JSON content: `\\"x"` is `\` then `"x"`. Logfmt:
629        // `\\\\\"x\"` (4 backslashes + escaped quote + x + escaped quote).
630        let line = r#"msg="MOCKFORGE_EXCHANGE:{\"k\":\"a\\\\\\\"x\\\"\"}" source=console"#;
631        let result = extract_exchange_json(line).unwrap();
632        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
633        assert_eq!(parsed["k"], r#"a\"x""#);
634    }
635}