Skip to main content

mockforge_bench/
executor.rs

1//! k6 execution and output handling
2
3use crate::error::{BenchError, Result};
4use indicatif::{ProgressBar, ProgressStyle};
5use std::path::Path;
6use std::process::Stdio;
7use std::sync::Arc;
8use tokio::io::{AsyncBufReadExt, BufReader};
9use tokio::process::Command as TokioCommand;
10
11/// Extract `MOCKFORGE_FAILURE:` JSON payload from a k6 output line.
12///
13/// k6 may format console.log lines differently depending on output mode:
14/// - Raw: `MOCKFORGE_FAILURE:{...}`
15/// - Logfmt: `time="..." level=info msg="MOCKFORGE_FAILURE:{...}" source=console`
16fn extract_failure_json(line: &str) -> Option<String> {
17    let marker = "MOCKFORGE_FAILURE:";
18    let start = line.find(marker)?;
19    let json_start = start + marker.len();
20    let json_str = &line[json_start..];
21    // Trim trailing `" source=console` if present (k6 logfmt)
22    let json_str = json_str.strip_suffix("\" source=console").unwrap_or(json_str).trim();
23    if json_str.is_empty() {
24        return None;
25    }
26    // k6 logfmt wraps msg in quotes and escapes inner quotes as \" and
27    // backslashes as \\. Unescape in order: backslashes first, then quotes.
28    // Only unescape if the raw string doesn't parse as JSON (raw mode output).
29    if json_str.starts_with('{') && json_str.contains("\\\"") {
30        Some(json_str.replace("\\\\", "\\").replace("\\\"", "\""))
31    } else {
32        Some(json_str.to_string())
33    }
34}
35
36/// k6 executor
37pub struct K6Executor {
38    k6_path: String,
39}
40
41impl K6Executor {
42    /// Create a new k6 executor
43    pub fn new() -> Result<Self> {
44        let k6_path = which::which("k6")
45            .map_err(|_| BenchError::K6NotFound)?
46            .to_string_lossy()
47            .to_string();
48
49        Ok(Self { k6_path })
50    }
51
52    /// Check if k6 is installed
53    pub fn is_k6_installed() -> bool {
54        which::which("k6").is_ok()
55    }
56
57    /// Get k6 version
58    pub async fn get_version(&self) -> Result<String> {
59        let output = TokioCommand::new(&self.k6_path)
60            .arg("version")
61            .output()
62            .await
63            .map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
64
65        if !output.status.success() {
66            return Err(BenchError::K6ExecutionFailed("Failed to get k6 version".to_string()));
67        }
68
69        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
70    }
71
72    /// Execute a k6 script
73    pub async fn execute(
74        &self,
75        script_path: &Path,
76        output_dir: Option<&Path>,
77        verbose: bool,
78    ) -> Result<K6Results> {
79        println!("Starting load test...\n");
80
81        let mut cmd = TokioCommand::new(&self.k6_path);
82        cmd.arg("run");
83
84        // Add output options
85        if let Some(dir) = output_dir {
86            let summary_path = dir.join("summary.json");
87            cmd.arg("--summary-export").arg(summary_path);
88        }
89
90        // Add verbosity
91        if verbose {
92            cmd.arg("--verbose");
93        }
94
95        cmd.arg(script_path);
96        cmd.stdout(Stdio::piped());
97        cmd.stderr(Stdio::piped());
98
99        let mut child = cmd.spawn().map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
100
101        let stdout = child
102            .stdout
103            .take()
104            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stdout".to_string()))?;
105
106        let stderr = child
107            .stderr
108            .take()
109            .ok_or_else(|| BenchError::K6ExecutionFailed("Failed to capture stderr".to_string()))?;
110
111        // Stream output
112        let stdout_reader = BufReader::new(stdout);
113        let stderr_reader = BufReader::new(stderr);
114
115        let mut stdout_lines = stdout_reader.lines();
116        let mut stderr_lines = stderr_reader.lines();
117
118        // Create progress indicator
119        let spinner = ProgressBar::new_spinner();
120        spinner.set_style(
121            ProgressStyle::default_spinner().template("{spinner:.green} {msg}").unwrap(),
122        );
123        spinner.set_message("Running load test...");
124
125        // Collect failure details from k6's console.log output
126        // k6 may emit console.log to either stdout or stderr depending on version/config
127        let failure_details: Arc<tokio::sync::Mutex<Vec<String>>> =
128            Arc::new(tokio::sync::Mutex::new(Vec::new()));
129        let fd_stdout = Arc::clone(&failure_details);
130        let fd_stderr = Arc::clone(&failure_details);
131
132        // Read stdout lines, capturing MOCKFORGE_FAILURE markers
133        let stdout_handle = tokio::spawn(async move {
134            while let Ok(Some(line)) = stdout_lines.next_line().await {
135                if let Some(json_str) = extract_failure_json(&line) {
136                    fd_stdout.lock().await.push(json_str);
137                } else {
138                    spinner.set_message(line.clone());
139                    if !line.is_empty() && !line.contains("running") && !line.contains("default") {
140                        println!("{}", line);
141                    }
142                }
143            }
144            spinner.finish_and_clear();
145        });
146
147        // Read stderr lines, capturing MOCKFORGE_FAILURE markers
148        let stderr_handle = tokio::spawn(async move {
149            while let Ok(Some(line)) = stderr_lines.next_line().await {
150                if !line.is_empty() {
151                    if let Some(json_str) = extract_failure_json(&line) {
152                        fd_stderr.lock().await.push(json_str);
153                    } else {
154                        eprintln!("{}", line);
155                    }
156                }
157            }
158        });
159
160        // Wait for completion
161        let status =
162            child.wait().await.map_err(|e| BenchError::K6ExecutionFailed(e.to_string()))?;
163
164        // Wait for both reader tasks to finish processing all lines
165        let _ = stdout_handle.await;
166        let _ = stderr_handle.await;
167
168        if !status.success() {
169            return Err(BenchError::K6ExecutionFailed(format!(
170                "k6 exited with status: {}",
171                status
172            )));
173        }
174
175        // Write failure details to file if any were captured
176        if let Some(dir) = output_dir {
177            let details = failure_details.lock().await;
178            if !details.is_empty() {
179                let failure_path = dir.join("conformance-failure-details.json");
180                let parsed: Vec<serde_json::Value> =
181                    details.iter().filter_map(|s| serde_json::from_str(s).ok()).collect();
182                if let Ok(json) = serde_json::to_string_pretty(&parsed) {
183                    let _ = std::fs::write(&failure_path, json);
184                }
185            }
186        }
187
188        // Parse results if output directory was specified
189        let results = if let Some(dir) = output_dir {
190            Self::parse_results(dir)?
191        } else {
192            K6Results::default()
193        };
194
195        Ok(results)
196    }
197
198    /// Parse k6 results from JSON output
199    fn parse_results(output_dir: &Path) -> Result<K6Results> {
200        let summary_path = output_dir.join("summary.json");
201
202        if !summary_path.exists() {
203            return Ok(K6Results::default());
204        }
205
206        let content = std::fs::read_to_string(summary_path)
207            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
208
209        let json: serde_json::Value = serde_json::from_str(&content)
210            .map_err(|e| BenchError::ResultsParseError(e.to_string()))?;
211
212        let duration_values = &json["metrics"]["http_req_duration"]["values"];
213
214        Ok(K6Results {
215            total_requests: json["metrics"]["http_reqs"]["values"]["count"].as_u64().unwrap_or(0),
216            failed_requests: json["metrics"]["http_req_failed"]["values"]["passes"]
217                .as_u64()
218                .unwrap_or(0),
219            avg_duration_ms: duration_values["avg"].as_f64().unwrap_or(0.0),
220            p95_duration_ms: duration_values["p(95)"].as_f64().unwrap_or(0.0),
221            p99_duration_ms: duration_values["p(99)"].as_f64().unwrap_or(0.0),
222            rps: json["metrics"]["http_reqs"]["values"]["rate"].as_f64().unwrap_or(0.0),
223            vus_max: json["metrics"]["vus_max"]["values"]["value"].as_u64().unwrap_or(0) as u32,
224            min_duration_ms: duration_values["min"].as_f64().unwrap_or(0.0),
225            max_duration_ms: duration_values["max"].as_f64().unwrap_or(0.0),
226            med_duration_ms: duration_values["med"].as_f64().unwrap_or(0.0),
227            p90_duration_ms: duration_values["p(90)"].as_f64().unwrap_or(0.0),
228        })
229    }
230}
231
232impl Default for K6Executor {
233    fn default() -> Self {
234        Self::new().expect("k6 not found")
235    }
236}
237
238/// k6 test results
239#[derive(Debug, Clone, Default)]
240pub struct K6Results {
241    pub total_requests: u64,
242    pub failed_requests: u64,
243    pub avg_duration_ms: f64,
244    pub p95_duration_ms: f64,
245    pub p99_duration_ms: f64,
246    pub rps: f64,
247    pub vus_max: u32,
248    pub min_duration_ms: f64,
249    pub max_duration_ms: f64,
250    pub med_duration_ms: f64,
251    pub p90_duration_ms: f64,
252}
253
254impl K6Results {
255    /// Get error rate as a percentage
256    pub fn error_rate(&self) -> f64 {
257        if self.total_requests == 0 {
258            return 0.0;
259        }
260        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
261    }
262
263    /// Get success rate as a percentage
264    pub fn success_rate(&self) -> f64 {
265        100.0 - self.error_rate()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn test_k6_results_error_rate() {
275        let results = K6Results {
276            total_requests: 100,
277            failed_requests: 5,
278            avg_duration_ms: 100.0,
279            p95_duration_ms: 200.0,
280            p99_duration_ms: 300.0,
281            ..Default::default()
282        };
283
284        assert_eq!(results.error_rate(), 5.0);
285        assert_eq!(results.success_rate(), 95.0);
286    }
287
288    #[test]
289    fn test_k6_results_zero_requests() {
290        let results = K6Results::default();
291        assert_eq!(results.error_rate(), 0.0);
292    }
293
294    #[test]
295    fn test_extract_failure_json_raw() {
296        let line = r#"MOCKFORGE_FAILURE:{"check":"test","expected":"status === 200"}"#;
297        let result = extract_failure_json(line).unwrap();
298        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
299        assert_eq!(parsed["check"], "test");
300    }
301
302    #[test]
303    fn test_extract_failure_json_logfmt() {
304        let line = r#"time="2026-01-01T00:00:00Z" level=info msg="MOCKFORGE_FAILURE:{\"check\":\"test\",\"response\":{\"body\":\"{\\\"key\\\":\\\"val\\\"}\"}} " source=console"#;
305        let result = extract_failure_json(line).unwrap();
306        let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
307        assert_eq!(parsed["check"], "test");
308        assert_eq!(parsed["response"]["body"], r#"{"key":"val"}"#);
309    }
310
311    #[test]
312    fn test_extract_failure_json_no_marker() {
313        assert!(extract_failure_json("just a regular log line").is_none());
314    }
315}