1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69 pub total_rps: f64,
71 pub avg_rps: f64,
73 pub total_vus_max: u32,
75 pub total_connections_opened: u64,
79 pub total_iterations_completed: u64,
81}
82
83impl AggregatedMetrics {
84 fn from_results(results: &[TargetResult]) -> Self {
86 let mut total_requests = 0u64;
87 let mut total_failed_requests = 0u64;
88 let mut durations = Vec::new();
89 let mut p95_values = Vec::new();
90 let mut p99_values = Vec::new();
91 let mut total_rps = 0.0f64;
92 let mut total_vus_max = 0u32;
93 let mut total_connections_opened = 0u64;
94 let mut total_iterations_completed = 0u64;
95 let mut successful_count = 0usize;
96
97 for result in results {
98 if result.success {
99 total_requests += result.results.total_requests;
100 total_failed_requests += result.results.failed_requests;
101 durations.push(result.results.avg_duration_ms);
102 p95_values.push(result.results.p95_duration_ms);
103 p99_values.push(result.results.p99_duration_ms);
104 total_rps += result.results.rps;
105 total_vus_max += result.results.vus_max;
106 total_connections_opened += result.results.tcp_connect_samples;
107 total_iterations_completed += result.results.iterations_completed;
108 successful_count += 1;
109 }
110 }
111
112 let avg_duration_ms = if !durations.is_empty() {
113 durations.iter().sum::<f64>() / durations.len() as f64
114 } else {
115 0.0
116 };
117
118 let p95_duration_ms = if !p95_values.is_empty() {
119 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
120 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
121 p95_values[index.min(p95_values.len() - 1)]
122 } else {
123 0.0
124 };
125
126 let p99_duration_ms = if !p99_values.is_empty() {
127 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
128 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
129 p99_values[index.min(p99_values.len() - 1)]
130 } else {
131 0.0
132 };
133
134 let error_rate = if total_requests > 0 {
135 (total_failed_requests as f64 / total_requests as f64) * 100.0
136 } else {
137 0.0
138 };
139
140 let avg_rps = if successful_count > 0 {
141 total_rps / successful_count as f64
142 } else {
143 0.0
144 };
145
146 Self {
147 total_requests,
148 total_failed_requests,
149 avg_duration_ms,
150 p95_duration_ms,
151 p99_duration_ms,
152 error_rate,
153 total_rps,
154 avg_rps,
155 total_vus_max,
156 total_connections_opened,
157 total_iterations_completed,
158 }
159 }
160}
161
162pub struct ParallelExecutor {
164 base_command: BenchCommand,
166 targets: Vec<TargetConfig>,
168 max_concurrency: usize,
170 base_output: PathBuf,
172}
173
174impl ParallelExecutor {
175 pub fn new(
177 base_command: BenchCommand,
178 targets: Vec<TargetConfig>,
179 max_concurrency: usize,
180 ) -> Self {
181 let base_output = base_command.output.clone();
182 Self {
183 base_command,
184 targets,
185 max_concurrency,
186 base_output,
187 }
188 }
189
190 pub async fn execute_all(&self) -> Result<AggregatedResults> {
192 let total_targets = self.targets.len();
193 TerminalReporter::print_progress(&format!(
194 "Starting parallel execution for {} targets (max concurrency: {})",
195 total_targets, self.max_concurrency
196 ));
197
198 if !K6Executor::is_k6_installed() {
200 TerminalReporter::print_error("k6 is not installed");
201 TerminalReporter::print_warning(
202 "Install k6 from: https://k6.io/docs/get-started/installation/",
203 );
204 return Err(BenchError::K6NotFound);
205 }
206
207 TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
209 let merged_spec = self.base_command.load_and_merge_specs().await?;
210 let parser = SpecParser::from_spec(merged_spec);
211 TerminalReporter::print_success("Specification(s) loaded");
212
213 let operations = if let Some(filter) = &self.base_command.operations {
215 parser.filter_operations(filter)?
216 } else {
217 parser.get_operations()
218 };
219
220 if operations.is_empty() {
221 return Err(BenchError::Other("No operations found in spec".to_string()));
222 }
223
224 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
225
226 TerminalReporter::print_progress("Generating request templates...");
228 let templates: Vec<_> = operations
229 .iter()
230 .map(RequestGenerator::generate_template)
231 .collect::<Result<Vec<_>>>()?;
232 TerminalReporter::print_success("Request templates generated");
233
234 let mut per_target_data: HashMap<
236 PathBuf,
237 (Vec<crate::request_gen::RequestTemplate>, Option<String>),
238 > = HashMap::new();
239 {
240 let mut unique_specs: Vec<PathBuf> = Vec::new();
241 for t in &self.targets {
242 if let Some(spec_path) = &t.spec {
243 if !unique_specs.contains(spec_path) {
244 unique_specs.push(spec_path.clone());
245 }
246 }
247 }
248 for spec_path in &unique_specs {
249 TerminalReporter::print_progress(&format!(
250 "Loading per-target spec: {}",
251 spec_path.display()
252 ));
253 match SpecParser::from_file(spec_path).await {
254 Ok(target_parser) => {
255 let target_ops = if let Some(filter) = &self.base_command.operations {
256 match target_parser.filter_operations(filter) {
257 Ok(ops) => ops,
258 Err(e) => {
259 TerminalReporter::print_warning(&format!(
260 "Failed to filter operations from {}: {}. Using shared spec.",
261 spec_path.display(),
262 e
263 ));
264 continue;
265 }
266 }
267 } else {
268 target_parser.get_operations()
269 };
270 let target_templates: Vec<_> = match target_ops
271 .iter()
272 .map(RequestGenerator::generate_template)
273 .collect::<Result<Vec<_>>>()
274 {
275 Ok(t) => t,
276 Err(e) => {
277 TerminalReporter::print_warning(&format!(
278 "Failed to generate templates from {}: {}. Using shared spec.",
279 spec_path.display(),
280 e
281 ));
282 continue;
283 }
284 };
285 let target_base_path = if let Some(cli_bp) = &self.base_command.base_path {
286 if cli_bp.is_empty() {
287 None
288 } else {
289 Some(cli_bp.clone())
290 }
291 } else {
292 target_parser.get_base_path()
293 };
294 TerminalReporter::print_success(&format!(
295 "Loaded {} operations from {}",
296 target_templates.len(),
297 spec_path.display()
298 ));
299 per_target_data
300 .insert(spec_path.clone(), (target_templates, target_base_path));
301 }
302 Err(e) => {
303 TerminalReporter::print_warning(&format!(
304 "Failed to load per-target spec {}: {}. Targets using this spec will use the shared spec.",
305 spec_path.display(),
306 e
307 ));
308 }
309 }
310 }
311 }
312
313 let base_headers = self.base_command.parse_headers()?;
315
316 let base_path = self.resolve_base_path(&parser);
318 if let Some(ref bp) = base_path {
319 TerminalReporter::print_progress(&format!("Using base path: {}", bp));
320 }
321
322 let scenario = LoadScenario::from_str(&self.base_command.scenario)
324 .map_err(BenchError::InvalidScenario)?;
325
326 let duration_secs_val = BenchCommand::parse_duration(&self.base_command.duration)?;
327
328 let security_testing_enabled_val =
330 self.base_command.security_test || self.base_command.wafbench_dir.is_some();
331
332 let has_advanced_features = self.base_command.data_file.is_some()
334 || self.base_command.error_rate.is_some()
335 || self.base_command.security_test
336 || self.base_command.parallel_create.is_some()
337 || self.base_command.wafbench_dir.is_some();
338
339 let enhancement_code = if has_advanced_features {
340 let dummy_script = "export const options = {};";
341 let enhanced = self.base_command.generate_enhanced_script(dummy_script)?;
342 if let Some(pos) = enhanced.find("export const options") {
343 enhanced[..pos].to_string()
344 } else {
345 String::new()
346 }
347 } else {
348 String::new()
349 };
350
351 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
353 let multi_progress = MultiProgress::new();
354
355 let progress_bars: Vec<ProgressBar> = (0..total_targets)
357 .map(|i| {
358 let pb = multi_progress.add(ProgressBar::new(1));
359 pb.set_style(
360 ProgressStyle::default_bar()
361 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
362 .unwrap(),
363 );
364 pb.set_message(format!("Target {}", i + 1));
365 pb
366 })
367 .collect();
368
369 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
371
372 for (index, target) in self.targets.iter().enumerate() {
373 let target = target.clone();
374 let duration = self.base_command.duration.clone();
376 let vus = self.base_command.vus;
377 let scenario_str = self.base_command.scenario.clone();
378 let operations = self.base_command.operations.clone();
379 let auth = self.base_command.auth.clone();
380 let headers = self.base_command.headers.clone();
381 let threshold_percentile = self.base_command.threshold_percentile.clone();
382 let threshold_ms = self.base_command.threshold_ms;
383 let max_error_rate = self.base_command.max_error_rate;
384 let verbose = self.base_command.verbose;
385 let skip_tls_verify = self.base_command.skip_tls_verify;
386 let chunked_request_bodies = self.base_command.chunked_request_bodies;
387 let target_rps = self.base_command.target_rps;
388 let no_keep_alive = self.base_command.no_keep_alive;
389
390 let (templates, base_path) = if let Some(spec_path) = &target.spec {
392 if let Some((t, bp)) = per_target_data.get(spec_path) {
393 (t.clone(), bp.clone())
394 } else {
395 (templates.clone(), base_path.clone())
396 }
397 } else {
398 (templates.clone(), base_path.clone())
399 };
400
401 let base_headers = base_headers.clone();
402 let scenario = scenario.clone();
403 let duration_secs = duration_secs_val;
404 let base_output = self.base_output.clone();
405 let semaphore = semaphore.clone();
406 let progress_bar = progress_bars[index].clone();
407 let target_index = index;
408 let security_testing_enabled = security_testing_enabled_val;
409 let enhancement_code = enhancement_code.clone();
410
411 let handle = tokio::spawn(async move {
412 let _permit = semaphore.acquire().await.map_err(|e| {
414 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
415 })?;
416
417 progress_bar.set_message(format!("Testing {}", target.url));
418
419 let result = Self::execute_single_target_internal(
421 &duration,
422 vus,
423 &scenario_str,
424 &operations,
425 &auth,
426 &headers,
427 &threshold_percentile,
428 threshold_ms,
429 max_error_rate,
430 verbose,
431 skip_tls_verify,
432 base_path.as_ref(),
433 &target,
434 target_index,
435 &templates,
436 &base_headers,
437 &scenario,
438 duration_secs,
439 &base_output,
440 security_testing_enabled,
441 chunked_request_bodies,
442 target_rps,
443 no_keep_alive,
444 &enhancement_code,
445 )
446 .await;
447
448 progress_bar.inc(1);
449 progress_bar.finish_with_message(format!("Completed {}", target.url));
450
451 result
452 });
453
454 handles.push(handle);
455 }
456
457 let mut target_results = Vec::new();
459 for (index, handle) in handles.into_iter().enumerate() {
460 match handle.await {
461 Ok(Ok(result)) => {
462 target_results.push(result);
463 }
464 Ok(Err(e)) => {
465 let target_url = self.targets[index].url.clone();
467 target_results.push(TargetResult {
468 target_url: target_url.clone(),
469 target_index: index,
470 results: K6Results::default(),
471 output_dir: self.base_output.join(format!("target_{}", index + 1)),
472 success: false,
473 error: Some(e.to_string()),
474 });
475 }
476 Err(e) => {
477 let target_url = self.targets[index].url.clone();
479 target_results.push(TargetResult {
480 target_url: target_url.clone(),
481 target_index: index,
482 results: K6Results::default(),
483 output_dir: self.base_output.join(format!("target_{}", index + 1)),
484 success: false,
485 error: Some(format!("Task join error: {}", e)),
486 });
487 }
488 }
489 }
490
491 target_results.sort_by_key(|r| r.target_index);
493
494 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
496
497 let successful_targets = target_results.iter().filter(|r| r.success).count();
498 let failed_targets = total_targets - successful_targets;
499
500 Ok(AggregatedResults {
501 target_results,
502 total_targets,
503 successful_targets,
504 failed_targets,
505 aggregated_metrics,
506 })
507 }
508
509 fn resolve_base_path(&self, parser: &SpecParser) -> Option<String> {
511 if let Some(cli_base_path) = &self.base_command.base_path {
513 if cli_base_path.is_empty() {
514 return None;
515 }
516 return Some(cli_base_path.clone());
517 }
518 parser.get_base_path()
520 }
521
522 #[allow(clippy::too_many_arguments)]
524 async fn execute_single_target_internal(
525 _duration: &str,
526 vus: u32,
527 _scenario_str: &str,
528 _operations: &Option<String>,
529 auth: &Option<String>,
530 _headers: &Option<String>,
531 threshold_percentile: &str,
532 threshold_ms: u64,
533 max_error_rate: f64,
534 verbose: bool,
535 skip_tls_verify: bool,
536 base_path: Option<&String>,
537 target: &TargetConfig,
538 target_index: usize,
539 templates: &[crate::request_gen::RequestTemplate],
540 base_headers: &HashMap<String, String>,
541 scenario: &LoadScenario,
542 duration_secs: u64,
543 base_output: &Path,
544 security_testing_enabled: bool,
545 chunked_request_bodies: bool,
546 target_rps: Option<u32>,
547 no_keep_alive: bool,
548 enhancement_code: &str,
549 ) -> Result<TargetResult> {
550 let mut custom_headers = base_headers.clone();
552 if let Some(target_headers) = &target.headers {
553 custom_headers.extend(target_headers.clone());
554 }
555
556 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
558
559 let k6_config = K6Config {
561 target_url: target.url.clone(),
562 base_path: base_path.cloned(),
563 scenario: scenario.clone(),
564 duration_secs,
565 max_vus: vus,
566 threshold_percentile: threshold_percentile.to_string(),
567 threshold_ms,
568 max_error_rate,
569 auth_header,
570 custom_headers,
571 skip_tls_verify,
572 security_testing_enabled,
573 chunked_request_bodies,
574 target_rps,
575 no_keep_alive,
576 };
577
578 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
580 let mut script = generator.generate()?;
581
582 if !enhancement_code.is_empty() {
584 if let Some(pos) = script.find("export const options") {
585 script.insert_str(pos, enhancement_code);
586 }
587 }
588
589 let validation_errors = K6ScriptGenerator::validate_script(&script);
591 if !validation_errors.is_empty() {
592 return Err(BenchError::Other(format!(
593 "Script validation failed for target {}: {}",
594 target.url,
595 validation_errors.join(", ")
596 )));
597 }
598
599 let output_dir = base_output.join(format!("target_{}", target_index + 1));
601 std::fs::create_dir_all(&output_dir)?;
602
603 let script_path = output_dir.join("k6-script.js");
605 std::fs::write(&script_path, script)?;
606
607 let api_port = 6565 + (target_index as u16) + 1; let executor = K6Executor::new()?;
611 let results = executor
612 .execute_with_port(&script_path, Some(&output_dir), verbose, Some(api_port))
613 .await;
614
615 match results {
616 Ok(k6_results) => Ok(TargetResult {
617 target_url: target.url.clone(),
618 target_index,
619 results: k6_results,
620 output_dir,
621 success: true,
622 error: None,
623 }),
624 Err(e) => Ok(TargetResult {
625 target_url: target.url.clone(),
626 target_index,
627 results: K6Results::default(),
628 output_dir,
629 success: false,
630 error: Some(e.to_string()),
631 }),
632 }
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639
640 #[test]
641 fn test_aggregated_metrics_from_results() {
642 let results = vec![
643 TargetResult {
644 target_url: "http://api1.com".to_string(),
645 target_index: 0,
646 results: K6Results {
647 total_requests: 100,
648 failed_requests: 5,
649 avg_duration_ms: 100.0,
650 p95_duration_ms: 200.0,
651 p99_duration_ms: 300.0,
652 ..Default::default()
653 },
654 output_dir: PathBuf::from("output1"),
655 success: true,
656 error: None,
657 },
658 TargetResult {
659 target_url: "http://api2.com".to_string(),
660 target_index: 1,
661 results: K6Results {
662 total_requests: 200,
663 failed_requests: 10,
664 avg_duration_ms: 150.0,
665 p95_duration_ms: 250.0,
666 p99_duration_ms: 350.0,
667 ..Default::default()
668 },
669 output_dir: PathBuf::from("output2"),
670 success: true,
671 error: None,
672 },
673 ];
674
675 let metrics = AggregatedMetrics::from_results(&results);
676 assert_eq!(metrics.total_requests, 300);
677 assert_eq!(metrics.total_failed_requests, 15);
678 assert_eq!(metrics.avg_duration_ms, 125.0); }
680
681 #[test]
682 fn test_aggregated_metrics_with_failed_targets() {
683 let results = vec![
684 TargetResult {
685 target_url: "http://api1.com".to_string(),
686 target_index: 0,
687 results: K6Results {
688 total_requests: 100,
689 failed_requests: 5,
690 avg_duration_ms: 100.0,
691 p95_duration_ms: 200.0,
692 p99_duration_ms: 300.0,
693 ..Default::default()
694 },
695 output_dir: PathBuf::from("output1"),
696 success: true,
697 error: None,
698 },
699 TargetResult {
700 target_url: "http://api2.com".to_string(),
701 target_index: 1,
702 results: K6Results::default(),
703 output_dir: PathBuf::from("output2"),
704 success: false,
705 error: Some("Network error".to_string()),
706 },
707 ];
708
709 let metrics = AggregatedMetrics::from_results(&results);
710 assert_eq!(metrics.total_requests, 100);
712 assert_eq!(metrics.total_failed_requests, 5);
713 assert_eq!(metrics.avg_duration_ms, 100.0);
714 }
715
716 #[test]
717 fn test_aggregated_metrics_empty_results() {
718 let results = vec![];
719 let metrics = AggregatedMetrics::from_results(&results);
720 assert_eq!(metrics.total_requests, 0);
721 assert_eq!(metrics.total_failed_requests, 0);
722 assert_eq!(metrics.avg_duration_ms, 0.0);
723 assert_eq!(metrics.error_rate, 0.0);
724 }
725
726 #[test]
727 fn test_aggregated_metrics_error_rate_calculation() {
728 let results = vec![TargetResult {
729 target_url: "http://api1.com".to_string(),
730 target_index: 0,
731 results: K6Results {
732 total_requests: 1000,
733 failed_requests: 50,
734 avg_duration_ms: 100.0,
735 p95_duration_ms: 200.0,
736 p99_duration_ms: 300.0,
737 ..Default::default()
738 },
739 output_dir: PathBuf::from("output1"),
740 success: true,
741 error: None,
742 }];
743
744 let metrics = AggregatedMetrics::from_results(&results);
745 assert_eq!(metrics.error_rate, 5.0); }
747
748 #[test]
749 fn test_aggregated_metrics_p95_p99_calculation() {
750 let results = vec![
751 TargetResult {
752 target_url: "http://api1.com".to_string(),
753 target_index: 0,
754 results: K6Results {
755 total_requests: 100,
756 failed_requests: 0,
757 avg_duration_ms: 100.0,
758 p95_duration_ms: 150.0,
759 p99_duration_ms: 200.0,
760 ..Default::default()
761 },
762 output_dir: PathBuf::from("output1"),
763 success: true,
764 error: None,
765 },
766 TargetResult {
767 target_url: "http://api2.com".to_string(),
768 target_index: 1,
769 results: K6Results {
770 total_requests: 100,
771 failed_requests: 0,
772 avg_duration_ms: 200.0,
773 p95_duration_ms: 250.0,
774 p99_duration_ms: 300.0,
775 ..Default::default()
776 },
777 output_dir: PathBuf::from("output2"),
778 success: true,
779 error: None,
780 },
781 TargetResult {
782 target_url: "http://api3.com".to_string(),
783 target_index: 2,
784 results: K6Results {
785 total_requests: 100,
786 failed_requests: 0,
787 avg_duration_ms: 300.0,
788 p95_duration_ms: 350.0,
789 p99_duration_ms: 400.0,
790 ..Default::default()
791 },
792 output_dir: PathBuf::from("output3"),
793 success: true,
794 error: None,
795 },
796 ];
797
798 let metrics = AggregatedMetrics::from_results(&results);
799 assert_eq!(metrics.p95_duration_ms, 350.0);
802 assert_eq!(metrics.p99_duration_ms, 400.0);
803 }
804}