1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69 pub total_rps: f64,
71 pub avg_rps: f64,
73 pub total_vus_max: u32,
75}
76
77impl AggregatedMetrics {
78 fn from_results(results: &[TargetResult]) -> Self {
80 let mut total_requests = 0u64;
81 let mut total_failed_requests = 0u64;
82 let mut durations = Vec::new();
83 let mut p95_values = Vec::new();
84 let mut p99_values = Vec::new();
85 let mut total_rps = 0.0f64;
86 let mut total_vus_max = 0u32;
87 let mut successful_count = 0usize;
88
89 for result in results {
90 if result.success {
91 total_requests += result.results.total_requests;
92 total_failed_requests += result.results.failed_requests;
93 durations.push(result.results.avg_duration_ms);
94 p95_values.push(result.results.p95_duration_ms);
95 p99_values.push(result.results.p99_duration_ms);
96 total_rps += result.results.rps;
97 total_vus_max += result.results.vus_max;
98 successful_count += 1;
99 }
100 }
101
102 let avg_duration_ms = if !durations.is_empty() {
103 durations.iter().sum::<f64>() / durations.len() as f64
104 } else {
105 0.0
106 };
107
108 let p95_duration_ms = if !p95_values.is_empty() {
109 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
110 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
111 p95_values[index.min(p95_values.len() - 1)]
112 } else {
113 0.0
114 };
115
116 let p99_duration_ms = if !p99_values.is_empty() {
117 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
118 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
119 p99_values[index.min(p99_values.len() - 1)]
120 } else {
121 0.0
122 };
123
124 let error_rate = if total_requests > 0 {
125 (total_failed_requests as f64 / total_requests as f64) * 100.0
126 } else {
127 0.0
128 };
129
130 let avg_rps = if successful_count > 0 {
131 total_rps / successful_count as f64
132 } else {
133 0.0
134 };
135
136 Self {
137 total_requests,
138 total_failed_requests,
139 avg_duration_ms,
140 p95_duration_ms,
141 p99_duration_ms,
142 error_rate,
143 total_rps,
144 avg_rps,
145 total_vus_max,
146 }
147 }
148}
149
150pub struct ParallelExecutor {
152 base_command: BenchCommand,
154 targets: Vec<TargetConfig>,
156 max_concurrency: usize,
158 base_output: PathBuf,
160}
161
162impl ParallelExecutor {
163 pub fn new(
165 base_command: BenchCommand,
166 targets: Vec<TargetConfig>,
167 max_concurrency: usize,
168 ) -> Self {
169 let base_output = base_command.output.clone();
170 Self {
171 base_command,
172 targets,
173 max_concurrency,
174 base_output,
175 }
176 }
177
178 pub async fn execute_all(&self) -> Result<AggregatedResults> {
180 let total_targets = self.targets.len();
181 TerminalReporter::print_progress(&format!(
182 "Starting parallel execution for {} targets (max concurrency: {})",
183 total_targets, self.max_concurrency
184 ));
185
186 if !K6Executor::is_k6_installed() {
188 TerminalReporter::print_error("k6 is not installed");
189 TerminalReporter::print_warning(
190 "Install k6 from: https://k6.io/docs/get-started/installation/",
191 );
192 return Err(BenchError::K6NotFound);
193 }
194
195 TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
197 let merged_spec = self.base_command.load_and_merge_specs().await?;
198 let parser = SpecParser::from_spec(merged_spec);
199 TerminalReporter::print_success("Specification(s) loaded");
200
201 let operations = if let Some(filter) = &self.base_command.operations {
203 parser.filter_operations(filter)?
204 } else {
205 parser.get_operations()
206 };
207
208 if operations.is_empty() {
209 return Err(BenchError::Other("No operations found in spec".to_string()));
210 }
211
212 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
213
214 TerminalReporter::print_progress("Generating request templates...");
216 let templates: Vec<_> = operations
217 .iter()
218 .map(RequestGenerator::generate_template)
219 .collect::<Result<Vec<_>>>()?;
220 TerminalReporter::print_success("Request templates generated");
221
222 let mut per_target_data: HashMap<
224 PathBuf,
225 (Vec<crate::request_gen::RequestTemplate>, Option<String>),
226 > = HashMap::new();
227 {
228 let mut unique_specs: Vec<PathBuf> = Vec::new();
229 for t in &self.targets {
230 if let Some(spec_path) = &t.spec {
231 if !unique_specs.contains(spec_path) {
232 unique_specs.push(spec_path.clone());
233 }
234 }
235 }
236 for spec_path in &unique_specs {
237 TerminalReporter::print_progress(&format!(
238 "Loading per-target spec: {}",
239 spec_path.display()
240 ));
241 match SpecParser::from_file(spec_path).await {
242 Ok(target_parser) => {
243 let target_ops = if let Some(filter) = &self.base_command.operations {
244 match target_parser.filter_operations(filter) {
245 Ok(ops) => ops,
246 Err(e) => {
247 TerminalReporter::print_warning(&format!(
248 "Failed to filter operations from {}: {}. Using shared spec.",
249 spec_path.display(),
250 e
251 ));
252 continue;
253 }
254 }
255 } else {
256 target_parser.get_operations()
257 };
258 let target_templates: Vec<_> = match target_ops
259 .iter()
260 .map(RequestGenerator::generate_template)
261 .collect::<Result<Vec<_>>>()
262 {
263 Ok(t) => t,
264 Err(e) => {
265 TerminalReporter::print_warning(&format!(
266 "Failed to generate templates from {}: {}. Using shared spec.",
267 spec_path.display(),
268 e
269 ));
270 continue;
271 }
272 };
273 let target_base_path = if let Some(cli_bp) = &self.base_command.base_path {
274 if cli_bp.is_empty() {
275 None
276 } else {
277 Some(cli_bp.clone())
278 }
279 } else {
280 target_parser.get_base_path()
281 };
282 TerminalReporter::print_success(&format!(
283 "Loaded {} operations from {}",
284 target_templates.len(),
285 spec_path.display()
286 ));
287 per_target_data
288 .insert(spec_path.clone(), (target_templates, target_base_path));
289 }
290 Err(e) => {
291 TerminalReporter::print_warning(&format!(
292 "Failed to load per-target spec {}: {}. Targets using this spec will use the shared spec.",
293 spec_path.display(),
294 e
295 ));
296 }
297 }
298 }
299 }
300
301 let base_headers = self.base_command.parse_headers()?;
303
304 let base_path = self.resolve_base_path(&parser);
306 if let Some(ref bp) = base_path {
307 TerminalReporter::print_progress(&format!("Using base path: {}", bp));
308 }
309
310 let scenario = LoadScenario::from_str(&self.base_command.scenario)
312 .map_err(BenchError::InvalidScenario)?;
313
314 let duration_secs = BenchCommand::parse_duration(&self.base_command.duration)?;
315
316 let security_testing_enabled =
318 self.base_command.security_test || self.base_command.wafbench_dir.is_some();
319
320 let has_advanced_features = self.base_command.data_file.is_some()
322 || self.base_command.error_rate.is_some()
323 || self.base_command.security_test
324 || self.base_command.parallel_create.is_some()
325 || self.base_command.wafbench_dir.is_some();
326
327 let enhancement_code = if has_advanced_features {
328 let dummy_script = "export const options = {};";
329 let enhanced = self.base_command.generate_enhanced_script(dummy_script)?;
330 if let Some(pos) = enhanced.find("export const options") {
331 enhanced[..pos].to_string()
332 } else {
333 String::new()
334 }
335 } else {
336 String::new()
337 };
338
339 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
341 let multi_progress = MultiProgress::new();
342
343 let progress_bars: Vec<ProgressBar> = (0..total_targets)
345 .map(|i| {
346 let pb = multi_progress.add(ProgressBar::new(1));
347 pb.set_style(
348 ProgressStyle::default_bar()
349 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
350 .unwrap(),
351 );
352 pb.set_message(format!("Target {}", i + 1));
353 pb
354 })
355 .collect();
356
357 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
359
360 for (index, target) in self.targets.iter().enumerate() {
361 let target = target.clone();
362 let duration = self.base_command.duration.clone();
364 let vus = self.base_command.vus;
365 let scenario_str = self.base_command.scenario.clone();
366 let operations = self.base_command.operations.clone();
367 let auth = self.base_command.auth.clone();
368 let headers = self.base_command.headers.clone();
369 let threshold_percentile = self.base_command.threshold_percentile.clone();
370 let threshold_ms = self.base_command.threshold_ms;
371 let max_error_rate = self.base_command.max_error_rate;
372 let verbose = self.base_command.verbose;
373 let skip_tls_verify = self.base_command.skip_tls_verify;
374
375 let (templates, base_path) = if let Some(spec_path) = &target.spec {
377 if let Some((t, bp)) = per_target_data.get(spec_path) {
378 (t.clone(), bp.clone())
379 } else {
380 (templates.clone(), base_path.clone())
381 }
382 } else {
383 (templates.clone(), base_path.clone())
384 };
385
386 let base_headers = base_headers.clone();
387 let scenario = scenario.clone();
388 let duration_secs = duration_secs;
389 let base_output = self.base_output.clone();
390 let semaphore = semaphore.clone();
391 let progress_bar = progress_bars[index].clone();
392 let target_index = index;
393 let security_testing_enabled = security_testing_enabled;
394 let enhancement_code = enhancement_code.clone();
395
396 let handle = tokio::spawn(async move {
397 let _permit = semaphore.acquire().await.map_err(|e| {
399 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
400 })?;
401
402 progress_bar.set_message(format!("Testing {}", target.url));
403
404 let result = Self::execute_single_target_internal(
406 &duration,
407 vus,
408 &scenario_str,
409 &operations,
410 &auth,
411 &headers,
412 &threshold_percentile,
413 threshold_ms,
414 max_error_rate,
415 verbose,
416 skip_tls_verify,
417 base_path.as_ref(),
418 &target,
419 target_index,
420 &templates,
421 &base_headers,
422 &scenario,
423 duration_secs,
424 &base_output,
425 security_testing_enabled,
426 &enhancement_code,
427 )
428 .await;
429
430 progress_bar.inc(1);
431 progress_bar.finish_with_message(format!("Completed {}", target.url));
432
433 result
434 });
435
436 handles.push(handle);
437 }
438
439 let mut target_results = Vec::new();
441 for (index, handle) in handles.into_iter().enumerate() {
442 match handle.await {
443 Ok(Ok(result)) => {
444 target_results.push(result);
445 }
446 Ok(Err(e)) => {
447 let target_url = self.targets[index].url.clone();
449 target_results.push(TargetResult {
450 target_url: target_url.clone(),
451 target_index: index,
452 results: K6Results::default(),
453 output_dir: self.base_output.join(format!("target_{}", index + 1)),
454 success: false,
455 error: Some(e.to_string()),
456 });
457 }
458 Err(e) => {
459 let target_url = self.targets[index].url.clone();
461 target_results.push(TargetResult {
462 target_url: target_url.clone(),
463 target_index: index,
464 results: K6Results::default(),
465 output_dir: self.base_output.join(format!("target_{}", index + 1)),
466 success: false,
467 error: Some(format!("Task join error: {}", e)),
468 });
469 }
470 }
471 }
472
473 target_results.sort_by_key(|r| r.target_index);
475
476 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
478
479 let successful_targets = target_results.iter().filter(|r| r.success).count();
480 let failed_targets = total_targets - successful_targets;
481
482 Ok(AggregatedResults {
483 target_results,
484 total_targets,
485 successful_targets,
486 failed_targets,
487 aggregated_metrics,
488 })
489 }
490
491 fn resolve_base_path(&self, parser: &SpecParser) -> Option<String> {
493 if let Some(cli_base_path) = &self.base_command.base_path {
495 if cli_base_path.is_empty() {
496 return None;
497 }
498 return Some(cli_base_path.clone());
499 }
500 parser.get_base_path()
502 }
503
504 #[allow(clippy::too_many_arguments)]
506 async fn execute_single_target_internal(
507 _duration: &str,
508 vus: u32,
509 _scenario_str: &str,
510 _operations: &Option<String>,
511 auth: &Option<String>,
512 _headers: &Option<String>,
513 threshold_percentile: &str,
514 threshold_ms: u64,
515 max_error_rate: f64,
516 verbose: bool,
517 skip_tls_verify: bool,
518 base_path: Option<&String>,
519 target: &TargetConfig,
520 target_index: usize,
521 templates: &[crate::request_gen::RequestTemplate],
522 base_headers: &HashMap<String, String>,
523 scenario: &LoadScenario,
524 duration_secs: u64,
525 base_output: &Path,
526 security_testing_enabled: bool,
527 enhancement_code: &str,
528 ) -> Result<TargetResult> {
529 let mut custom_headers = base_headers.clone();
531 if let Some(target_headers) = &target.headers {
532 custom_headers.extend(target_headers.clone());
533 }
534
535 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
537
538 let k6_config = K6Config {
540 target_url: target.url.clone(),
541 base_path: base_path.cloned(),
542 scenario: scenario.clone(),
543 duration_secs,
544 max_vus: vus,
545 threshold_percentile: threshold_percentile.to_string(),
546 threshold_ms,
547 max_error_rate,
548 auth_header,
549 custom_headers,
550 skip_tls_verify,
551 security_testing_enabled,
552 };
553
554 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
556 let mut script = generator.generate()?;
557
558 if !enhancement_code.is_empty() {
560 if let Some(pos) = script.find("export const options") {
561 script.insert_str(pos, enhancement_code);
562 }
563 }
564
565 let validation_errors = K6ScriptGenerator::validate_script(&script);
567 if !validation_errors.is_empty() {
568 return Err(BenchError::Other(format!(
569 "Script validation failed for target {}: {}",
570 target.url,
571 validation_errors.join(", ")
572 )));
573 }
574
575 let output_dir = base_output.join(format!("target_{}", target_index + 1));
577 std::fs::create_dir_all(&output_dir)?;
578
579 let script_path = output_dir.join("k6-script.js");
581 std::fs::write(&script_path, script)?;
582
583 let executor = K6Executor::new()?;
585 let results = executor.execute(&script_path, Some(&output_dir), verbose).await;
586
587 match results {
588 Ok(k6_results) => Ok(TargetResult {
589 target_url: target.url.clone(),
590 target_index,
591 results: k6_results,
592 output_dir,
593 success: true,
594 error: None,
595 }),
596 Err(e) => Ok(TargetResult {
597 target_url: target.url.clone(),
598 target_index,
599 results: K6Results::default(),
600 output_dir,
601 success: false,
602 error: Some(e.to_string()),
603 }),
604 }
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 #[test]
613 fn test_aggregated_metrics_from_results() {
614 let results = vec![
615 TargetResult {
616 target_url: "http://api1.com".to_string(),
617 target_index: 0,
618 results: K6Results {
619 total_requests: 100,
620 failed_requests: 5,
621 avg_duration_ms: 100.0,
622 p95_duration_ms: 200.0,
623 p99_duration_ms: 300.0,
624 ..Default::default()
625 },
626 output_dir: PathBuf::from("output1"),
627 success: true,
628 error: None,
629 },
630 TargetResult {
631 target_url: "http://api2.com".to_string(),
632 target_index: 1,
633 results: K6Results {
634 total_requests: 200,
635 failed_requests: 10,
636 avg_duration_ms: 150.0,
637 p95_duration_ms: 250.0,
638 p99_duration_ms: 350.0,
639 ..Default::default()
640 },
641 output_dir: PathBuf::from("output2"),
642 success: true,
643 error: None,
644 },
645 ];
646
647 let metrics = AggregatedMetrics::from_results(&results);
648 assert_eq!(metrics.total_requests, 300);
649 assert_eq!(metrics.total_failed_requests, 15);
650 assert_eq!(metrics.avg_duration_ms, 125.0); }
652
653 #[test]
654 fn test_aggregated_metrics_with_failed_targets() {
655 let results = vec![
656 TargetResult {
657 target_url: "http://api1.com".to_string(),
658 target_index: 0,
659 results: K6Results {
660 total_requests: 100,
661 failed_requests: 5,
662 avg_duration_ms: 100.0,
663 p95_duration_ms: 200.0,
664 p99_duration_ms: 300.0,
665 ..Default::default()
666 },
667 output_dir: PathBuf::from("output1"),
668 success: true,
669 error: None,
670 },
671 TargetResult {
672 target_url: "http://api2.com".to_string(),
673 target_index: 1,
674 results: K6Results::default(),
675 output_dir: PathBuf::from("output2"),
676 success: false,
677 error: Some("Network error".to_string()),
678 },
679 ];
680
681 let metrics = AggregatedMetrics::from_results(&results);
682 assert_eq!(metrics.total_requests, 100);
684 assert_eq!(metrics.total_failed_requests, 5);
685 assert_eq!(metrics.avg_duration_ms, 100.0);
686 }
687
688 #[test]
689 fn test_aggregated_metrics_empty_results() {
690 let results = vec![];
691 let metrics = AggregatedMetrics::from_results(&results);
692 assert_eq!(metrics.total_requests, 0);
693 assert_eq!(metrics.total_failed_requests, 0);
694 assert_eq!(metrics.avg_duration_ms, 0.0);
695 assert_eq!(metrics.error_rate, 0.0);
696 }
697
698 #[test]
699 fn test_aggregated_metrics_error_rate_calculation() {
700 let results = vec![TargetResult {
701 target_url: "http://api1.com".to_string(),
702 target_index: 0,
703 results: K6Results {
704 total_requests: 1000,
705 failed_requests: 50,
706 avg_duration_ms: 100.0,
707 p95_duration_ms: 200.0,
708 p99_duration_ms: 300.0,
709 ..Default::default()
710 },
711 output_dir: PathBuf::from("output1"),
712 success: true,
713 error: None,
714 }];
715
716 let metrics = AggregatedMetrics::from_results(&results);
717 assert_eq!(metrics.error_rate, 5.0); }
719
720 #[test]
721 fn test_aggregated_metrics_p95_p99_calculation() {
722 let results = vec![
723 TargetResult {
724 target_url: "http://api1.com".to_string(),
725 target_index: 0,
726 results: K6Results {
727 total_requests: 100,
728 failed_requests: 0,
729 avg_duration_ms: 100.0,
730 p95_duration_ms: 150.0,
731 p99_duration_ms: 200.0,
732 ..Default::default()
733 },
734 output_dir: PathBuf::from("output1"),
735 success: true,
736 error: None,
737 },
738 TargetResult {
739 target_url: "http://api2.com".to_string(),
740 target_index: 1,
741 results: K6Results {
742 total_requests: 100,
743 failed_requests: 0,
744 avg_duration_ms: 200.0,
745 p95_duration_ms: 250.0,
746 p99_duration_ms: 300.0,
747 ..Default::default()
748 },
749 output_dir: PathBuf::from("output2"),
750 success: true,
751 error: None,
752 },
753 TargetResult {
754 target_url: "http://api3.com".to_string(),
755 target_index: 2,
756 results: K6Results {
757 total_requests: 100,
758 failed_requests: 0,
759 avg_duration_ms: 300.0,
760 p95_duration_ms: 350.0,
761 p99_duration_ms: 400.0,
762 ..Default::default()
763 },
764 output_dir: PathBuf::from("output3"),
765 success: true,
766 error: None,
767 },
768 ];
769
770 let metrics = AggregatedMetrics::from_results(&results);
771 assert_eq!(metrics.p95_duration_ms, 350.0);
774 assert_eq!(metrics.p99_duration_ms, 400.0);
775 }
776}