1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69}
70
71impl AggregatedMetrics {
72 fn from_results(results: &[TargetResult]) -> Self {
74 let mut total_requests = 0u64;
75 let mut total_failed_requests = 0u64;
76 let mut durations = Vec::new();
77 let mut p95_values = Vec::new();
78 let mut p99_values = Vec::new();
79
80 for result in results {
81 if result.success {
82 total_requests += result.results.total_requests;
83 total_failed_requests += result.results.failed_requests;
84 durations.push(result.results.avg_duration_ms);
85 p95_values.push(result.results.p95_duration_ms);
86 p99_values.push(result.results.p99_duration_ms);
87 }
88 }
89
90 let avg_duration_ms = if !durations.is_empty() {
91 durations.iter().sum::<f64>() / durations.len() as f64
92 } else {
93 0.0
94 };
95
96 let p95_duration_ms = if !p95_values.is_empty() {
97 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
98 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
99 p95_values[index.min(p95_values.len() - 1)]
100 } else {
101 0.0
102 };
103
104 let p99_duration_ms = if !p99_values.is_empty() {
105 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
106 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
107 p99_values[index.min(p99_values.len() - 1)]
108 } else {
109 0.0
110 };
111
112 let error_rate = if total_requests > 0 {
113 (total_failed_requests as f64 / total_requests as f64) * 100.0
114 } else {
115 0.0
116 };
117
118 Self {
119 total_requests,
120 total_failed_requests,
121 avg_duration_ms,
122 p95_duration_ms,
123 p99_duration_ms,
124 error_rate,
125 }
126 }
127}
128
129pub struct ParallelExecutor {
131 base_command: BenchCommand,
133 targets: Vec<TargetConfig>,
135 max_concurrency: usize,
137 base_output: PathBuf,
139}
140
141impl ParallelExecutor {
142 pub fn new(
144 base_command: BenchCommand,
145 targets: Vec<TargetConfig>,
146 max_concurrency: usize,
147 ) -> Self {
148 let base_output = base_command.output.clone();
149 Self {
150 base_command,
151 targets,
152 max_concurrency,
153 base_output,
154 }
155 }
156
157 pub async fn execute_all(&self) -> Result<AggregatedResults> {
159 let total_targets = self.targets.len();
160 TerminalReporter::print_progress(&format!(
161 "Starting parallel execution for {} targets (max concurrency: {})",
162 total_targets, self.max_concurrency
163 ));
164
165 if !K6Executor::is_k6_installed() {
167 TerminalReporter::print_error("k6 is not installed");
168 TerminalReporter::print_warning(
169 "Install k6 from: https://k6.io/docs/get-started/installation/",
170 );
171 return Err(BenchError::K6NotFound);
172 }
173
174 TerminalReporter::print_progress("Loading OpenAPI specification...");
176 let parser = SpecParser::from_file(&self.base_command.spec).await?;
177 TerminalReporter::print_success("Specification loaded");
178
179 let operations = if let Some(filter) = &self.base_command.operations {
181 parser.filter_operations(filter)?
182 } else {
183 parser.get_operations()
184 };
185
186 if operations.is_empty() {
187 return Err(BenchError::Other("No operations found in spec".to_string()));
188 }
189
190 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
191
192 TerminalReporter::print_progress("Generating request templates...");
194 let templates: Vec<_> = operations
195 .iter()
196 .map(RequestGenerator::generate_template)
197 .collect::<Result<Vec<_>>>()?;
198 TerminalReporter::print_success("Request templates generated");
199
200 let base_headers = self.base_command.parse_headers()?;
202
203 let scenario = LoadScenario::from_str(&self.base_command.scenario)
205 .map_err(BenchError::InvalidScenario)?;
206
207 let duration_secs = BenchCommand::parse_duration(&self.base_command.duration)?;
208
209 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
211 let multi_progress = MultiProgress::new();
212
213 let progress_bars: Vec<ProgressBar> = (0..total_targets)
215 .map(|i| {
216 let pb = multi_progress.add(ProgressBar::new(1));
217 pb.set_style(
218 ProgressStyle::default_bar()
219 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
220 .unwrap(),
221 );
222 pb.set_message(format!("Target {}", i + 1));
223 pb
224 })
225 .collect();
226
227 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
229
230 for (index, target) in self.targets.iter().enumerate() {
231 let target = target.clone();
232 let spec = self.base_command.spec.clone();
234 let duration = self.base_command.duration.clone();
235 let vus = self.base_command.vus;
236 let scenario_str = self.base_command.scenario.clone();
237 let operations = self.base_command.operations.clone();
238 let auth = self.base_command.auth.clone();
239 let headers = self.base_command.headers.clone();
240 let threshold_percentile = self.base_command.threshold_percentile.clone();
241 let threshold_ms = self.base_command.threshold_ms;
242 let max_error_rate = self.base_command.max_error_rate;
243 let verbose = self.base_command.verbose;
244 let skip_tls_verify = self.base_command.skip_tls_verify;
245
246 let templates = templates.clone();
247 let base_headers = base_headers.clone();
248 let scenario = scenario.clone();
249 let duration_secs = duration_secs;
250 let base_output = self.base_output.clone();
251 let semaphore = semaphore.clone();
252 let progress_bar = progress_bars[index].clone();
253 let target_index = index;
254
255 let handle = tokio::spawn(async move {
256 let _permit = semaphore.acquire().await.map_err(|e| {
258 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
259 })?;
260
261 progress_bar.set_message(format!("Testing {}", target.url));
262
263 let result = Self::execute_single_target_internal(
267 &spec,
268 &duration,
269 vus,
270 &scenario_str,
271 &operations,
272 &auth,
273 &headers,
274 &threshold_percentile,
275 threshold_ms,
276 max_error_rate,
277 verbose,
278 skip_tls_verify,
279 &target,
280 target_index,
281 &templates,
282 &base_headers,
283 &scenario,
284 duration_secs,
285 &base_output,
286 )
287 .await;
288
289 progress_bar.inc(1);
290 progress_bar.finish_with_message(format!("Completed {}", target.url));
291
292 result
293 });
294
295 handles.push(handle);
296 }
297
298 let mut target_results = Vec::new();
300 for (index, handle) in handles.into_iter().enumerate() {
301 match handle.await {
302 Ok(Ok(result)) => {
303 target_results.push(result);
304 }
305 Ok(Err(e)) => {
306 let target_url = self.targets[index].url.clone();
308 target_results.push(TargetResult {
309 target_url: target_url.clone(),
310 target_index: index,
311 results: K6Results::default(),
312 output_dir: self.base_output.join(format!("target_{}", index + 1)),
313 success: false,
314 error: Some(e.to_string()),
315 });
316 }
317 Err(e) => {
318 let target_url = self.targets[index].url.clone();
320 target_results.push(TargetResult {
321 target_url: target_url.clone(),
322 target_index: index,
323 results: K6Results::default(),
324 output_dir: self.base_output.join(format!("target_{}", index + 1)),
325 success: false,
326 error: Some(format!("Task join error: {}", e)),
327 });
328 }
329 }
330 }
331
332 target_results.sort_by_key(|r| r.target_index);
334
335 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
337
338 let successful_targets = target_results.iter().filter(|r| r.success).count();
339 let failed_targets = total_targets - successful_targets;
340
341 Ok(AggregatedResults {
342 target_results,
343 total_targets,
344 successful_targets,
345 failed_targets,
346 aggregated_metrics,
347 })
348 }
349
350 async fn execute_single_target_internal(
352 spec: &PathBuf,
353 duration: &str,
354 vus: u32,
355 scenario_str: &str,
356 operations: &Option<String>,
357 auth: &Option<String>,
358 headers: &Option<String>,
359 threshold_percentile: &str,
360 threshold_ms: u64,
361 max_error_rate: f64,
362 verbose: bool,
363 skip_tls_verify: bool,
364 target: &TargetConfig,
365 target_index: usize,
366 templates: &[crate::request_gen::RequestTemplate],
367 base_headers: &HashMap<String, String>,
368 scenario: &LoadScenario,
369 duration_secs: u64,
370 base_output: &Path,
371 ) -> Result<TargetResult> {
372 let mut custom_headers = base_headers.clone();
374 if let Some(target_headers) = &target.headers {
375 custom_headers.extend(target_headers.clone());
376 }
377
378 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
380
381 let k6_config = K6Config {
383 target_url: target.url.clone(),
384 scenario: scenario.clone(),
385 duration_secs,
386 max_vus: vus,
387 threshold_percentile: threshold_percentile.to_string(),
388 threshold_ms,
389 max_error_rate,
390 auth_header,
391 custom_headers,
392 skip_tls_verify,
393 };
394
395 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
397 let script = generator.generate()?;
398
399 let validation_errors = K6ScriptGenerator::validate_script(&script);
401 if !validation_errors.is_empty() {
402 return Err(BenchError::Other(format!(
403 "Script validation failed for target {}: {}",
404 target.url,
405 validation_errors.join(", ")
406 )));
407 }
408
409 let output_dir = base_output.join(format!("target_{}", target_index + 1));
411 std::fs::create_dir_all(&output_dir)?;
412
413 let script_path = output_dir.join("k6-script.js");
415 std::fs::write(&script_path, script)?;
416
417 let executor = K6Executor::new()?;
419 let results = executor.execute(&script_path, Some(&output_dir), verbose).await;
420
421 match results {
422 Ok(k6_results) => Ok(TargetResult {
423 target_url: target.url.clone(),
424 target_index,
425 results: k6_results,
426 output_dir,
427 success: true,
428 error: None,
429 }),
430 Err(e) => Ok(TargetResult {
431 target_url: target.url.clone(),
432 target_index,
433 results: K6Results::default(),
434 output_dir,
435 success: false,
436 error: Some(e.to_string()),
437 }),
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn test_aggregated_metrics_from_results() {
448 let results = vec![
449 TargetResult {
450 target_url: "http://api1.com".to_string(),
451 target_index: 0,
452 results: K6Results {
453 total_requests: 100,
454 failed_requests: 5,
455 avg_duration_ms: 100.0,
456 p95_duration_ms: 200.0,
457 p99_duration_ms: 300.0,
458 },
459 output_dir: PathBuf::from("output1"),
460 success: true,
461 error: None,
462 },
463 TargetResult {
464 target_url: "http://api2.com".to_string(),
465 target_index: 1,
466 results: K6Results {
467 total_requests: 200,
468 failed_requests: 10,
469 avg_duration_ms: 150.0,
470 p95_duration_ms: 250.0,
471 p99_duration_ms: 350.0,
472 },
473 output_dir: PathBuf::from("output2"),
474 success: true,
475 error: None,
476 },
477 ];
478
479 let metrics = AggregatedMetrics::from_results(&results);
480 assert_eq!(metrics.total_requests, 300);
481 assert_eq!(metrics.total_failed_requests, 15);
482 assert_eq!(metrics.avg_duration_ms, 125.0); }
484
485 #[test]
486 fn test_aggregated_metrics_with_failed_targets() {
487 let results = vec![
488 TargetResult {
489 target_url: "http://api1.com".to_string(),
490 target_index: 0,
491 results: K6Results {
492 total_requests: 100,
493 failed_requests: 5,
494 avg_duration_ms: 100.0,
495 p95_duration_ms: 200.0,
496 p99_duration_ms: 300.0,
497 },
498 output_dir: PathBuf::from("output1"),
499 success: true,
500 error: None,
501 },
502 TargetResult {
503 target_url: "http://api2.com".to_string(),
504 target_index: 1,
505 results: K6Results::default(),
506 output_dir: PathBuf::from("output2"),
507 success: false,
508 error: Some("Network error".to_string()),
509 },
510 ];
511
512 let metrics = AggregatedMetrics::from_results(&results);
513 assert_eq!(metrics.total_requests, 100);
515 assert_eq!(metrics.total_failed_requests, 5);
516 assert_eq!(metrics.avg_duration_ms, 100.0);
517 }
518
519 #[test]
520 fn test_aggregated_metrics_empty_results() {
521 let results = vec![];
522 let metrics = AggregatedMetrics::from_results(&results);
523 assert_eq!(metrics.total_requests, 0);
524 assert_eq!(metrics.total_failed_requests, 0);
525 assert_eq!(metrics.avg_duration_ms, 0.0);
526 assert_eq!(metrics.error_rate, 0.0);
527 }
528
529 #[test]
530 fn test_aggregated_metrics_error_rate_calculation() {
531 let results = vec![TargetResult {
532 target_url: "http://api1.com".to_string(),
533 target_index: 0,
534 results: K6Results {
535 total_requests: 1000,
536 failed_requests: 50,
537 avg_duration_ms: 100.0,
538 p95_duration_ms: 200.0,
539 p99_duration_ms: 300.0,
540 },
541 output_dir: PathBuf::from("output1"),
542 success: true,
543 error: None,
544 }];
545
546 let metrics = AggregatedMetrics::from_results(&results);
547 assert_eq!(metrics.error_rate, 5.0); }
549
550 #[test]
551 fn test_aggregated_metrics_p95_p99_calculation() {
552 let results = vec![
553 TargetResult {
554 target_url: "http://api1.com".to_string(),
555 target_index: 0,
556 results: K6Results {
557 total_requests: 100,
558 failed_requests: 0,
559 avg_duration_ms: 100.0,
560 p95_duration_ms: 150.0,
561 p99_duration_ms: 200.0,
562 },
563 output_dir: PathBuf::from("output1"),
564 success: true,
565 error: None,
566 },
567 TargetResult {
568 target_url: "http://api2.com".to_string(),
569 target_index: 1,
570 results: K6Results {
571 total_requests: 100,
572 failed_requests: 0,
573 avg_duration_ms: 200.0,
574 p95_duration_ms: 250.0,
575 p99_duration_ms: 300.0,
576 },
577 output_dir: PathBuf::from("output2"),
578 success: true,
579 error: None,
580 },
581 TargetResult {
582 target_url: "http://api3.com".to_string(),
583 target_index: 2,
584 results: K6Results {
585 total_requests: 100,
586 failed_requests: 0,
587 avg_duration_ms: 300.0,
588 p95_duration_ms: 350.0,
589 p99_duration_ms: 400.0,
590 },
591 output_dir: PathBuf::from("output3"),
592 success: true,
593 error: None,
594 },
595 ];
596
597 let metrics = AggregatedMetrics::from_results(&results);
598 assert_eq!(metrics.p95_duration_ms, 350.0);
601 assert_eq!(metrics.p99_duration_ms, 400.0);
602 }
603}