1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69 pub total_rps: f64,
71 pub avg_rps: f64,
73 pub total_vus_max: u32,
75}
76
77impl AggregatedMetrics {
78 fn from_results(results: &[TargetResult]) -> Self {
80 let mut total_requests = 0u64;
81 let mut total_failed_requests = 0u64;
82 let mut durations = Vec::new();
83 let mut p95_values = Vec::new();
84 let mut p99_values = Vec::new();
85 let mut total_rps = 0.0f64;
86 let mut total_vus_max = 0u32;
87 let mut successful_count = 0usize;
88
89 for result in results {
90 if result.success {
91 total_requests += result.results.total_requests;
92 total_failed_requests += result.results.failed_requests;
93 durations.push(result.results.avg_duration_ms);
94 p95_values.push(result.results.p95_duration_ms);
95 p99_values.push(result.results.p99_duration_ms);
96 total_rps += result.results.rps;
97 total_vus_max += result.results.vus_max;
98 successful_count += 1;
99 }
100 }
101
102 let avg_duration_ms = if !durations.is_empty() {
103 durations.iter().sum::<f64>() / durations.len() as f64
104 } else {
105 0.0
106 };
107
108 let p95_duration_ms = if !p95_values.is_empty() {
109 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
110 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
111 p95_values[index.min(p95_values.len() - 1)]
112 } else {
113 0.0
114 };
115
116 let p99_duration_ms = if !p99_values.is_empty() {
117 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
118 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
119 p99_values[index.min(p99_values.len() - 1)]
120 } else {
121 0.0
122 };
123
124 let error_rate = if total_requests > 0 {
125 (total_failed_requests as f64 / total_requests as f64) * 100.0
126 } else {
127 0.0
128 };
129
130 let avg_rps = if successful_count > 0 {
131 total_rps / successful_count as f64
132 } else {
133 0.0
134 };
135
136 Self {
137 total_requests,
138 total_failed_requests,
139 avg_duration_ms,
140 p95_duration_ms,
141 p99_duration_ms,
142 error_rate,
143 total_rps,
144 avg_rps,
145 total_vus_max,
146 }
147 }
148}
149
150pub struct ParallelExecutor {
152 base_command: BenchCommand,
154 targets: Vec<TargetConfig>,
156 max_concurrency: usize,
158 base_output: PathBuf,
160}
161
162impl ParallelExecutor {
163 pub fn new(
165 base_command: BenchCommand,
166 targets: Vec<TargetConfig>,
167 max_concurrency: usize,
168 ) -> Self {
169 let base_output = base_command.output.clone();
170 Self {
171 base_command,
172 targets,
173 max_concurrency,
174 base_output,
175 }
176 }
177
178 pub async fn execute_all(&self) -> Result<AggregatedResults> {
180 let total_targets = self.targets.len();
181 TerminalReporter::print_progress(&format!(
182 "Starting parallel execution for {} targets (max concurrency: {})",
183 total_targets, self.max_concurrency
184 ));
185
186 if !K6Executor::is_k6_installed() {
188 TerminalReporter::print_error("k6 is not installed");
189 TerminalReporter::print_warning(
190 "Install k6 from: https://k6.io/docs/get-started/installation/",
191 );
192 return Err(BenchError::K6NotFound);
193 }
194
195 TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
197 let merged_spec = self.base_command.load_and_merge_specs().await?;
198 let parser = SpecParser::from_spec(merged_spec);
199 TerminalReporter::print_success("Specification(s) loaded");
200
201 let operations = if let Some(filter) = &self.base_command.operations {
203 parser.filter_operations(filter)?
204 } else {
205 parser.get_operations()
206 };
207
208 if operations.is_empty() {
209 return Err(BenchError::Other("No operations found in spec".to_string()));
210 }
211
212 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
213
214 TerminalReporter::print_progress("Generating request templates...");
216 let templates: Vec<_> = operations
217 .iter()
218 .map(RequestGenerator::generate_template)
219 .collect::<Result<Vec<_>>>()?;
220 TerminalReporter::print_success("Request templates generated");
221
222 let mut per_target_data: HashMap<
224 PathBuf,
225 (Vec<crate::request_gen::RequestTemplate>, Option<String>),
226 > = HashMap::new();
227 {
228 let mut unique_specs: Vec<PathBuf> = Vec::new();
229 for t in &self.targets {
230 if let Some(spec_path) = &t.spec {
231 if !unique_specs.contains(spec_path) {
232 unique_specs.push(spec_path.clone());
233 }
234 }
235 }
236 for spec_path in &unique_specs {
237 TerminalReporter::print_progress(&format!(
238 "Loading per-target spec: {}",
239 spec_path.display()
240 ));
241 match SpecParser::from_file(spec_path).await {
242 Ok(target_parser) => {
243 let target_ops = if let Some(filter) = &self.base_command.operations {
244 match target_parser.filter_operations(filter) {
245 Ok(ops) => ops,
246 Err(e) => {
247 TerminalReporter::print_warning(&format!(
248 "Failed to filter operations from {}: {}. Using shared spec.",
249 spec_path.display(),
250 e
251 ));
252 continue;
253 }
254 }
255 } else {
256 target_parser.get_operations()
257 };
258 let target_templates: Vec<_> = match target_ops
259 .iter()
260 .map(RequestGenerator::generate_template)
261 .collect::<Result<Vec<_>>>()
262 {
263 Ok(t) => t,
264 Err(e) => {
265 TerminalReporter::print_warning(&format!(
266 "Failed to generate templates from {}: {}. Using shared spec.",
267 spec_path.display(),
268 e
269 ));
270 continue;
271 }
272 };
273 let target_base_path = if let Some(cli_bp) = &self.base_command.base_path {
274 if cli_bp.is_empty() {
275 None
276 } else {
277 Some(cli_bp.clone())
278 }
279 } else {
280 target_parser.get_base_path()
281 };
282 TerminalReporter::print_success(&format!(
283 "Loaded {} operations from {}",
284 target_templates.len(),
285 spec_path.display()
286 ));
287 per_target_data
288 .insert(spec_path.clone(), (target_templates, target_base_path));
289 }
290 Err(e) => {
291 TerminalReporter::print_warning(&format!(
292 "Failed to load per-target spec {}: {}. Targets using this spec will use the shared spec.",
293 spec_path.display(),
294 e
295 ));
296 }
297 }
298 }
299 }
300
301 let base_headers = self.base_command.parse_headers()?;
303
304 let base_path = self.resolve_base_path(&parser);
306 if let Some(ref bp) = base_path {
307 TerminalReporter::print_progress(&format!("Using base path: {}", bp));
308 }
309
310 let scenario = LoadScenario::from_str(&self.base_command.scenario)
312 .map_err(BenchError::InvalidScenario)?;
313
314 let duration_secs_val = BenchCommand::parse_duration(&self.base_command.duration)?;
315
316 let security_testing_enabled_val =
318 self.base_command.security_test || self.base_command.wafbench_dir.is_some();
319
320 let has_advanced_features = self.base_command.data_file.is_some()
322 || self.base_command.error_rate.is_some()
323 || self.base_command.security_test
324 || self.base_command.parallel_create.is_some()
325 || self.base_command.wafbench_dir.is_some();
326
327 let enhancement_code = if has_advanced_features {
328 let dummy_script = "export const options = {};";
329 let enhanced = self.base_command.generate_enhanced_script(dummy_script)?;
330 if let Some(pos) = enhanced.find("export const options") {
331 enhanced[..pos].to_string()
332 } else {
333 String::new()
334 }
335 } else {
336 String::new()
337 };
338
339 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
341 let multi_progress = MultiProgress::new();
342
343 let progress_bars: Vec<ProgressBar> = (0..total_targets)
345 .map(|i| {
346 let pb = multi_progress.add(ProgressBar::new(1));
347 pb.set_style(
348 ProgressStyle::default_bar()
349 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
350 .unwrap(),
351 );
352 pb.set_message(format!("Target {}", i + 1));
353 pb
354 })
355 .collect();
356
357 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
359
360 for (index, target) in self.targets.iter().enumerate() {
361 let target = target.clone();
362 let duration = self.base_command.duration.clone();
364 let vus = self.base_command.vus;
365 let scenario_str = self.base_command.scenario.clone();
366 let operations = self.base_command.operations.clone();
367 let auth = self.base_command.auth.clone();
368 let headers = self.base_command.headers.clone();
369 let threshold_percentile = self.base_command.threshold_percentile.clone();
370 let threshold_ms = self.base_command.threshold_ms;
371 let max_error_rate = self.base_command.max_error_rate;
372 let verbose = self.base_command.verbose;
373 let skip_tls_verify = self.base_command.skip_tls_verify;
374 let chunked_request_bodies = self.base_command.chunked_request_bodies;
375 let target_rps = self.base_command.target_rps;
376 let no_keep_alive = self.base_command.no_keep_alive;
377
378 let (templates, base_path) = if let Some(spec_path) = &target.spec {
380 if let Some((t, bp)) = per_target_data.get(spec_path) {
381 (t.clone(), bp.clone())
382 } else {
383 (templates.clone(), base_path.clone())
384 }
385 } else {
386 (templates.clone(), base_path.clone())
387 };
388
389 let base_headers = base_headers.clone();
390 let scenario = scenario.clone();
391 let duration_secs = duration_secs_val;
392 let base_output = self.base_output.clone();
393 let semaphore = semaphore.clone();
394 let progress_bar = progress_bars[index].clone();
395 let target_index = index;
396 let security_testing_enabled = security_testing_enabled_val;
397 let enhancement_code = enhancement_code.clone();
398
399 let handle = tokio::spawn(async move {
400 let _permit = semaphore.acquire().await.map_err(|e| {
402 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
403 })?;
404
405 progress_bar.set_message(format!("Testing {}", target.url));
406
407 let result = Self::execute_single_target_internal(
409 &duration,
410 vus,
411 &scenario_str,
412 &operations,
413 &auth,
414 &headers,
415 &threshold_percentile,
416 threshold_ms,
417 max_error_rate,
418 verbose,
419 skip_tls_verify,
420 base_path.as_ref(),
421 &target,
422 target_index,
423 &templates,
424 &base_headers,
425 &scenario,
426 duration_secs,
427 &base_output,
428 security_testing_enabled,
429 chunked_request_bodies,
430 target_rps,
431 no_keep_alive,
432 &enhancement_code,
433 )
434 .await;
435
436 progress_bar.inc(1);
437 progress_bar.finish_with_message(format!("Completed {}", target.url));
438
439 result
440 });
441
442 handles.push(handle);
443 }
444
445 let mut target_results = Vec::new();
447 for (index, handle) in handles.into_iter().enumerate() {
448 match handle.await {
449 Ok(Ok(result)) => {
450 target_results.push(result);
451 }
452 Ok(Err(e)) => {
453 let target_url = self.targets[index].url.clone();
455 target_results.push(TargetResult {
456 target_url: target_url.clone(),
457 target_index: index,
458 results: K6Results::default(),
459 output_dir: self.base_output.join(format!("target_{}", index + 1)),
460 success: false,
461 error: Some(e.to_string()),
462 });
463 }
464 Err(e) => {
465 let target_url = self.targets[index].url.clone();
467 target_results.push(TargetResult {
468 target_url: target_url.clone(),
469 target_index: index,
470 results: K6Results::default(),
471 output_dir: self.base_output.join(format!("target_{}", index + 1)),
472 success: false,
473 error: Some(format!("Task join error: {}", e)),
474 });
475 }
476 }
477 }
478
479 target_results.sort_by_key(|r| r.target_index);
481
482 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
484
485 let successful_targets = target_results.iter().filter(|r| r.success).count();
486 let failed_targets = total_targets - successful_targets;
487
488 Ok(AggregatedResults {
489 target_results,
490 total_targets,
491 successful_targets,
492 failed_targets,
493 aggregated_metrics,
494 })
495 }
496
497 fn resolve_base_path(&self, parser: &SpecParser) -> Option<String> {
499 if let Some(cli_base_path) = &self.base_command.base_path {
501 if cli_base_path.is_empty() {
502 return None;
503 }
504 return Some(cli_base_path.clone());
505 }
506 parser.get_base_path()
508 }
509
510 #[allow(clippy::too_many_arguments)]
512 async fn execute_single_target_internal(
513 _duration: &str,
514 vus: u32,
515 _scenario_str: &str,
516 _operations: &Option<String>,
517 auth: &Option<String>,
518 _headers: &Option<String>,
519 threshold_percentile: &str,
520 threshold_ms: u64,
521 max_error_rate: f64,
522 verbose: bool,
523 skip_tls_verify: bool,
524 base_path: Option<&String>,
525 target: &TargetConfig,
526 target_index: usize,
527 templates: &[crate::request_gen::RequestTemplate],
528 base_headers: &HashMap<String, String>,
529 scenario: &LoadScenario,
530 duration_secs: u64,
531 base_output: &Path,
532 security_testing_enabled: bool,
533 chunked_request_bodies: bool,
534 target_rps: Option<u32>,
535 no_keep_alive: bool,
536 enhancement_code: &str,
537 ) -> Result<TargetResult> {
538 let mut custom_headers = base_headers.clone();
540 if let Some(target_headers) = &target.headers {
541 custom_headers.extend(target_headers.clone());
542 }
543
544 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
546
547 let k6_config = K6Config {
549 target_url: target.url.clone(),
550 base_path: base_path.cloned(),
551 scenario: scenario.clone(),
552 duration_secs,
553 max_vus: vus,
554 threshold_percentile: threshold_percentile.to_string(),
555 threshold_ms,
556 max_error_rate,
557 auth_header,
558 custom_headers,
559 skip_tls_verify,
560 security_testing_enabled,
561 chunked_request_bodies,
562 target_rps,
563 no_keep_alive,
564 };
565
566 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
568 let mut script = generator.generate()?;
569
570 if !enhancement_code.is_empty() {
572 if let Some(pos) = script.find("export const options") {
573 script.insert_str(pos, enhancement_code);
574 }
575 }
576
577 let validation_errors = K6ScriptGenerator::validate_script(&script);
579 if !validation_errors.is_empty() {
580 return Err(BenchError::Other(format!(
581 "Script validation failed for target {}: {}",
582 target.url,
583 validation_errors.join(", ")
584 )));
585 }
586
587 let output_dir = base_output.join(format!("target_{}", target_index + 1));
589 std::fs::create_dir_all(&output_dir)?;
590
591 let script_path = output_dir.join("k6-script.js");
593 std::fs::write(&script_path, script)?;
594
595 let api_port = 6565 + (target_index as u16) + 1; let executor = K6Executor::new()?;
599 let results = executor
600 .execute_with_port(&script_path, Some(&output_dir), verbose, Some(api_port))
601 .await;
602
603 match results {
604 Ok(k6_results) => Ok(TargetResult {
605 target_url: target.url.clone(),
606 target_index,
607 results: k6_results,
608 output_dir,
609 success: true,
610 error: None,
611 }),
612 Err(e) => Ok(TargetResult {
613 target_url: target.url.clone(),
614 target_index,
615 results: K6Results::default(),
616 output_dir,
617 success: false,
618 error: Some(e.to_string()),
619 }),
620 }
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627
628 #[test]
629 fn test_aggregated_metrics_from_results() {
630 let results = vec![
631 TargetResult {
632 target_url: "http://api1.com".to_string(),
633 target_index: 0,
634 results: K6Results {
635 total_requests: 100,
636 failed_requests: 5,
637 avg_duration_ms: 100.0,
638 p95_duration_ms: 200.0,
639 p99_duration_ms: 300.0,
640 ..Default::default()
641 },
642 output_dir: PathBuf::from("output1"),
643 success: true,
644 error: None,
645 },
646 TargetResult {
647 target_url: "http://api2.com".to_string(),
648 target_index: 1,
649 results: K6Results {
650 total_requests: 200,
651 failed_requests: 10,
652 avg_duration_ms: 150.0,
653 p95_duration_ms: 250.0,
654 p99_duration_ms: 350.0,
655 ..Default::default()
656 },
657 output_dir: PathBuf::from("output2"),
658 success: true,
659 error: None,
660 },
661 ];
662
663 let metrics = AggregatedMetrics::from_results(&results);
664 assert_eq!(metrics.total_requests, 300);
665 assert_eq!(metrics.total_failed_requests, 15);
666 assert_eq!(metrics.avg_duration_ms, 125.0); }
668
669 #[test]
670 fn test_aggregated_metrics_with_failed_targets() {
671 let results = vec![
672 TargetResult {
673 target_url: "http://api1.com".to_string(),
674 target_index: 0,
675 results: K6Results {
676 total_requests: 100,
677 failed_requests: 5,
678 avg_duration_ms: 100.0,
679 p95_duration_ms: 200.0,
680 p99_duration_ms: 300.0,
681 ..Default::default()
682 },
683 output_dir: PathBuf::from("output1"),
684 success: true,
685 error: None,
686 },
687 TargetResult {
688 target_url: "http://api2.com".to_string(),
689 target_index: 1,
690 results: K6Results::default(),
691 output_dir: PathBuf::from("output2"),
692 success: false,
693 error: Some("Network error".to_string()),
694 },
695 ];
696
697 let metrics = AggregatedMetrics::from_results(&results);
698 assert_eq!(metrics.total_requests, 100);
700 assert_eq!(metrics.total_failed_requests, 5);
701 assert_eq!(metrics.avg_duration_ms, 100.0);
702 }
703
704 #[test]
705 fn test_aggregated_metrics_empty_results() {
706 let results = vec![];
707 let metrics = AggregatedMetrics::from_results(&results);
708 assert_eq!(metrics.total_requests, 0);
709 assert_eq!(metrics.total_failed_requests, 0);
710 assert_eq!(metrics.avg_duration_ms, 0.0);
711 assert_eq!(metrics.error_rate, 0.0);
712 }
713
714 #[test]
715 fn test_aggregated_metrics_error_rate_calculation() {
716 let results = vec![TargetResult {
717 target_url: "http://api1.com".to_string(),
718 target_index: 0,
719 results: K6Results {
720 total_requests: 1000,
721 failed_requests: 50,
722 avg_duration_ms: 100.0,
723 p95_duration_ms: 200.0,
724 p99_duration_ms: 300.0,
725 ..Default::default()
726 },
727 output_dir: PathBuf::from("output1"),
728 success: true,
729 error: None,
730 }];
731
732 let metrics = AggregatedMetrics::from_results(&results);
733 assert_eq!(metrics.error_rate, 5.0); }
735
736 #[test]
737 fn test_aggregated_metrics_p95_p99_calculation() {
738 let results = vec![
739 TargetResult {
740 target_url: "http://api1.com".to_string(),
741 target_index: 0,
742 results: K6Results {
743 total_requests: 100,
744 failed_requests: 0,
745 avg_duration_ms: 100.0,
746 p95_duration_ms: 150.0,
747 p99_duration_ms: 200.0,
748 ..Default::default()
749 },
750 output_dir: PathBuf::from("output1"),
751 success: true,
752 error: None,
753 },
754 TargetResult {
755 target_url: "http://api2.com".to_string(),
756 target_index: 1,
757 results: K6Results {
758 total_requests: 100,
759 failed_requests: 0,
760 avg_duration_ms: 200.0,
761 p95_duration_ms: 250.0,
762 p99_duration_ms: 300.0,
763 ..Default::default()
764 },
765 output_dir: PathBuf::from("output2"),
766 success: true,
767 error: None,
768 },
769 TargetResult {
770 target_url: "http://api3.com".to_string(),
771 target_index: 2,
772 results: K6Results {
773 total_requests: 100,
774 failed_requests: 0,
775 avg_duration_ms: 300.0,
776 p95_duration_ms: 350.0,
777 p99_duration_ms: 400.0,
778 ..Default::default()
779 },
780 output_dir: PathBuf::from("output3"),
781 success: true,
782 error: None,
783 },
784 ];
785
786 let metrics = AggregatedMetrics::from_results(&results);
787 assert_eq!(metrics.p95_duration_ms, 350.0);
790 assert_eq!(metrics.p99_duration_ms, 400.0);
791 }
792}