1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69 pub total_rps: f64,
71 pub avg_rps: f64,
73 pub total_vus_max: u32,
75}
76
77impl AggregatedMetrics {
78 fn from_results(results: &[TargetResult]) -> Self {
80 let mut total_requests = 0u64;
81 let mut total_failed_requests = 0u64;
82 let mut durations = Vec::new();
83 let mut p95_values = Vec::new();
84 let mut p99_values = Vec::new();
85 let mut total_rps = 0.0f64;
86 let mut total_vus_max = 0u32;
87 let mut successful_count = 0usize;
88
89 for result in results {
90 if result.success {
91 total_requests += result.results.total_requests;
92 total_failed_requests += result.results.failed_requests;
93 durations.push(result.results.avg_duration_ms);
94 p95_values.push(result.results.p95_duration_ms);
95 p99_values.push(result.results.p99_duration_ms);
96 total_rps += result.results.rps;
97 total_vus_max += result.results.vus_max;
98 successful_count += 1;
99 }
100 }
101
102 let avg_duration_ms = if !durations.is_empty() {
103 durations.iter().sum::<f64>() / durations.len() as f64
104 } else {
105 0.0
106 };
107
108 let p95_duration_ms = if !p95_values.is_empty() {
109 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
110 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
111 p95_values[index.min(p95_values.len() - 1)]
112 } else {
113 0.0
114 };
115
116 let p99_duration_ms = if !p99_values.is_empty() {
117 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
118 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
119 p99_values[index.min(p99_values.len() - 1)]
120 } else {
121 0.0
122 };
123
124 let error_rate = if total_requests > 0 {
125 (total_failed_requests as f64 / total_requests as f64) * 100.0
126 } else {
127 0.0
128 };
129
130 let avg_rps = if successful_count > 0 {
131 total_rps / successful_count as f64
132 } else {
133 0.0
134 };
135
136 Self {
137 total_requests,
138 total_failed_requests,
139 avg_duration_ms,
140 p95_duration_ms,
141 p99_duration_ms,
142 error_rate,
143 total_rps,
144 avg_rps,
145 total_vus_max,
146 }
147 }
148}
149
150pub struct ParallelExecutor {
152 base_command: BenchCommand,
154 targets: Vec<TargetConfig>,
156 max_concurrency: usize,
158 base_output: PathBuf,
160}
161
162impl ParallelExecutor {
163 pub fn new(
165 base_command: BenchCommand,
166 targets: Vec<TargetConfig>,
167 max_concurrency: usize,
168 ) -> Self {
169 let base_output = base_command.output.clone();
170 Self {
171 base_command,
172 targets,
173 max_concurrency,
174 base_output,
175 }
176 }
177
178 pub async fn execute_all(&self) -> Result<AggregatedResults> {
180 let total_targets = self.targets.len();
181 TerminalReporter::print_progress(&format!(
182 "Starting parallel execution for {} targets (max concurrency: {})",
183 total_targets, self.max_concurrency
184 ));
185
186 if !K6Executor::is_k6_installed() {
188 TerminalReporter::print_error("k6 is not installed");
189 TerminalReporter::print_warning(
190 "Install k6 from: https://k6.io/docs/get-started/installation/",
191 );
192 return Err(BenchError::K6NotFound);
193 }
194
195 TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
197 let merged_spec = self.base_command.load_and_merge_specs().await?;
198 let parser = SpecParser::from_spec(merged_spec);
199 TerminalReporter::print_success("Specification(s) loaded");
200
201 let operations = if let Some(filter) = &self.base_command.operations {
203 parser.filter_operations(filter)?
204 } else {
205 parser.get_operations()
206 };
207
208 if operations.is_empty() {
209 return Err(BenchError::Other("No operations found in spec".to_string()));
210 }
211
212 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
213
214 TerminalReporter::print_progress("Generating request templates...");
216 let templates: Vec<_> = operations
217 .iter()
218 .map(RequestGenerator::generate_template)
219 .collect::<Result<Vec<_>>>()?;
220 TerminalReporter::print_success("Request templates generated");
221
222 let mut per_target_data: HashMap<
224 PathBuf,
225 (Vec<crate::request_gen::RequestTemplate>, Option<String>),
226 > = HashMap::new();
227 {
228 let mut unique_specs: Vec<PathBuf> = Vec::new();
229 for t in &self.targets {
230 if let Some(spec_path) = &t.spec {
231 if !unique_specs.contains(spec_path) {
232 unique_specs.push(spec_path.clone());
233 }
234 }
235 }
236 for spec_path in &unique_specs {
237 TerminalReporter::print_progress(&format!(
238 "Loading per-target spec: {}",
239 spec_path.display()
240 ));
241 match SpecParser::from_file(spec_path).await {
242 Ok(target_parser) => {
243 let target_ops = if let Some(filter) = &self.base_command.operations {
244 match target_parser.filter_operations(filter) {
245 Ok(ops) => ops,
246 Err(e) => {
247 TerminalReporter::print_warning(&format!(
248 "Failed to filter operations from {}: {}. Using shared spec.",
249 spec_path.display(),
250 e
251 ));
252 continue;
253 }
254 }
255 } else {
256 target_parser.get_operations()
257 };
258 let target_templates: Vec<_> = match target_ops
259 .iter()
260 .map(RequestGenerator::generate_template)
261 .collect::<Result<Vec<_>>>()
262 {
263 Ok(t) => t,
264 Err(e) => {
265 TerminalReporter::print_warning(&format!(
266 "Failed to generate templates from {}: {}. Using shared spec.",
267 spec_path.display(),
268 e
269 ));
270 continue;
271 }
272 };
273 let target_base_path = if let Some(cli_bp) = &self.base_command.base_path {
274 if cli_bp.is_empty() {
275 None
276 } else {
277 Some(cli_bp.clone())
278 }
279 } else {
280 target_parser.get_base_path()
281 };
282 TerminalReporter::print_success(&format!(
283 "Loaded {} operations from {}",
284 target_templates.len(),
285 spec_path.display()
286 ));
287 per_target_data
288 .insert(spec_path.clone(), (target_templates, target_base_path));
289 }
290 Err(e) => {
291 TerminalReporter::print_warning(&format!(
292 "Failed to load per-target spec {}: {}. Targets using this spec will use the shared spec.",
293 spec_path.display(),
294 e
295 ));
296 }
297 }
298 }
299 }
300
301 let base_headers = self.base_command.parse_headers()?;
303
304 let base_path = self.resolve_base_path(&parser);
306 if let Some(ref bp) = base_path {
307 TerminalReporter::print_progress(&format!("Using base path: {}", bp));
308 }
309
310 let scenario = LoadScenario::from_str(&self.base_command.scenario)
312 .map_err(BenchError::InvalidScenario)?;
313
314 let duration_secs_val = BenchCommand::parse_duration(&self.base_command.duration)?;
315
316 let security_testing_enabled_val =
318 self.base_command.security_test || self.base_command.wafbench_dir.is_some();
319
320 let has_advanced_features = self.base_command.data_file.is_some()
322 || self.base_command.error_rate.is_some()
323 || self.base_command.security_test
324 || self.base_command.parallel_create.is_some()
325 || self.base_command.wafbench_dir.is_some();
326
327 let enhancement_code = if has_advanced_features {
328 let dummy_script = "export const options = {};";
329 let enhanced = self.base_command.generate_enhanced_script(dummy_script)?;
330 if let Some(pos) = enhanced.find("export const options") {
331 enhanced[..pos].to_string()
332 } else {
333 String::new()
334 }
335 } else {
336 String::new()
337 };
338
339 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
341 let multi_progress = MultiProgress::new();
342
343 let progress_bars: Vec<ProgressBar> = (0..total_targets)
345 .map(|i| {
346 let pb = multi_progress.add(ProgressBar::new(1));
347 pb.set_style(
348 ProgressStyle::default_bar()
349 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
350 .unwrap(),
351 );
352 pb.set_message(format!("Target {}", i + 1));
353 pb
354 })
355 .collect();
356
357 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
359
360 for (index, target) in self.targets.iter().enumerate() {
361 let target = target.clone();
362 let duration = self.base_command.duration.clone();
364 let vus = self.base_command.vus;
365 let scenario_str = self.base_command.scenario.clone();
366 let operations = self.base_command.operations.clone();
367 let auth = self.base_command.auth.clone();
368 let headers = self.base_command.headers.clone();
369 let threshold_percentile = self.base_command.threshold_percentile.clone();
370 let threshold_ms = self.base_command.threshold_ms;
371 let max_error_rate = self.base_command.max_error_rate;
372 let verbose = self.base_command.verbose;
373 let skip_tls_verify = self.base_command.skip_tls_verify;
374 let chunked_request_bodies = self.base_command.chunked_request_bodies;
375
376 let (templates, base_path) = if let Some(spec_path) = &target.spec {
378 if let Some((t, bp)) = per_target_data.get(spec_path) {
379 (t.clone(), bp.clone())
380 } else {
381 (templates.clone(), base_path.clone())
382 }
383 } else {
384 (templates.clone(), base_path.clone())
385 };
386
387 let base_headers = base_headers.clone();
388 let scenario = scenario.clone();
389 let duration_secs = duration_secs_val;
390 let base_output = self.base_output.clone();
391 let semaphore = semaphore.clone();
392 let progress_bar = progress_bars[index].clone();
393 let target_index = index;
394 let security_testing_enabled = security_testing_enabled_val;
395 let enhancement_code = enhancement_code.clone();
396
397 let handle = tokio::spawn(async move {
398 let _permit = semaphore.acquire().await.map_err(|e| {
400 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
401 })?;
402
403 progress_bar.set_message(format!("Testing {}", target.url));
404
405 let result = Self::execute_single_target_internal(
407 &duration,
408 vus,
409 &scenario_str,
410 &operations,
411 &auth,
412 &headers,
413 &threshold_percentile,
414 threshold_ms,
415 max_error_rate,
416 verbose,
417 skip_tls_verify,
418 base_path.as_ref(),
419 &target,
420 target_index,
421 &templates,
422 &base_headers,
423 &scenario,
424 duration_secs,
425 &base_output,
426 security_testing_enabled,
427 chunked_request_bodies,
428 &enhancement_code,
429 )
430 .await;
431
432 progress_bar.inc(1);
433 progress_bar.finish_with_message(format!("Completed {}", target.url));
434
435 result
436 });
437
438 handles.push(handle);
439 }
440
441 let mut target_results = Vec::new();
443 for (index, handle) in handles.into_iter().enumerate() {
444 match handle.await {
445 Ok(Ok(result)) => {
446 target_results.push(result);
447 }
448 Ok(Err(e)) => {
449 let target_url = self.targets[index].url.clone();
451 target_results.push(TargetResult {
452 target_url: target_url.clone(),
453 target_index: index,
454 results: K6Results::default(),
455 output_dir: self.base_output.join(format!("target_{}", index + 1)),
456 success: false,
457 error: Some(e.to_string()),
458 });
459 }
460 Err(e) => {
461 let target_url = self.targets[index].url.clone();
463 target_results.push(TargetResult {
464 target_url: target_url.clone(),
465 target_index: index,
466 results: K6Results::default(),
467 output_dir: self.base_output.join(format!("target_{}", index + 1)),
468 success: false,
469 error: Some(format!("Task join error: {}", e)),
470 });
471 }
472 }
473 }
474
475 target_results.sort_by_key(|r| r.target_index);
477
478 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
480
481 let successful_targets = target_results.iter().filter(|r| r.success).count();
482 let failed_targets = total_targets - successful_targets;
483
484 Ok(AggregatedResults {
485 target_results,
486 total_targets,
487 successful_targets,
488 failed_targets,
489 aggregated_metrics,
490 })
491 }
492
493 fn resolve_base_path(&self, parser: &SpecParser) -> Option<String> {
495 if let Some(cli_base_path) = &self.base_command.base_path {
497 if cli_base_path.is_empty() {
498 return None;
499 }
500 return Some(cli_base_path.clone());
501 }
502 parser.get_base_path()
504 }
505
506 #[allow(clippy::too_many_arguments)]
508 async fn execute_single_target_internal(
509 _duration: &str,
510 vus: u32,
511 _scenario_str: &str,
512 _operations: &Option<String>,
513 auth: &Option<String>,
514 _headers: &Option<String>,
515 threshold_percentile: &str,
516 threshold_ms: u64,
517 max_error_rate: f64,
518 verbose: bool,
519 skip_tls_verify: bool,
520 base_path: Option<&String>,
521 target: &TargetConfig,
522 target_index: usize,
523 templates: &[crate::request_gen::RequestTemplate],
524 base_headers: &HashMap<String, String>,
525 scenario: &LoadScenario,
526 duration_secs: u64,
527 base_output: &Path,
528 security_testing_enabled: bool,
529 chunked_request_bodies: bool,
530 enhancement_code: &str,
531 ) -> Result<TargetResult> {
532 let mut custom_headers = base_headers.clone();
534 if let Some(target_headers) = &target.headers {
535 custom_headers.extend(target_headers.clone());
536 }
537
538 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
540
541 let k6_config = K6Config {
543 target_url: target.url.clone(),
544 base_path: base_path.cloned(),
545 scenario: scenario.clone(),
546 duration_secs,
547 max_vus: vus,
548 threshold_percentile: threshold_percentile.to_string(),
549 threshold_ms,
550 max_error_rate,
551 auth_header,
552 custom_headers,
553 skip_tls_verify,
554 security_testing_enabled,
555 chunked_request_bodies,
556 };
557
558 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
560 let mut script = generator.generate()?;
561
562 if !enhancement_code.is_empty() {
564 if let Some(pos) = script.find("export const options") {
565 script.insert_str(pos, enhancement_code);
566 }
567 }
568
569 let validation_errors = K6ScriptGenerator::validate_script(&script);
571 if !validation_errors.is_empty() {
572 return Err(BenchError::Other(format!(
573 "Script validation failed for target {}: {}",
574 target.url,
575 validation_errors.join(", ")
576 )));
577 }
578
579 let output_dir = base_output.join(format!("target_{}", target_index + 1));
581 std::fs::create_dir_all(&output_dir)?;
582
583 let script_path = output_dir.join("k6-script.js");
585 std::fs::write(&script_path, script)?;
586
587 let api_port = 6565 + (target_index as u16) + 1; let executor = K6Executor::new()?;
591 let results = executor
592 .execute_with_port(&script_path, Some(&output_dir), verbose, Some(api_port))
593 .await;
594
595 match results {
596 Ok(k6_results) => Ok(TargetResult {
597 target_url: target.url.clone(),
598 target_index,
599 results: k6_results,
600 output_dir,
601 success: true,
602 error: None,
603 }),
604 Err(e) => Ok(TargetResult {
605 target_url: target.url.clone(),
606 target_index,
607 results: K6Results::default(),
608 output_dir,
609 success: false,
610 error: Some(e.to_string()),
611 }),
612 }
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 #[test]
621 fn test_aggregated_metrics_from_results() {
622 let results = vec![
623 TargetResult {
624 target_url: "http://api1.com".to_string(),
625 target_index: 0,
626 results: K6Results {
627 total_requests: 100,
628 failed_requests: 5,
629 avg_duration_ms: 100.0,
630 p95_duration_ms: 200.0,
631 p99_duration_ms: 300.0,
632 ..Default::default()
633 },
634 output_dir: PathBuf::from("output1"),
635 success: true,
636 error: None,
637 },
638 TargetResult {
639 target_url: "http://api2.com".to_string(),
640 target_index: 1,
641 results: K6Results {
642 total_requests: 200,
643 failed_requests: 10,
644 avg_duration_ms: 150.0,
645 p95_duration_ms: 250.0,
646 p99_duration_ms: 350.0,
647 ..Default::default()
648 },
649 output_dir: PathBuf::from("output2"),
650 success: true,
651 error: None,
652 },
653 ];
654
655 let metrics = AggregatedMetrics::from_results(&results);
656 assert_eq!(metrics.total_requests, 300);
657 assert_eq!(metrics.total_failed_requests, 15);
658 assert_eq!(metrics.avg_duration_ms, 125.0); }
660
661 #[test]
662 fn test_aggregated_metrics_with_failed_targets() {
663 let results = vec![
664 TargetResult {
665 target_url: "http://api1.com".to_string(),
666 target_index: 0,
667 results: K6Results {
668 total_requests: 100,
669 failed_requests: 5,
670 avg_duration_ms: 100.0,
671 p95_duration_ms: 200.0,
672 p99_duration_ms: 300.0,
673 ..Default::default()
674 },
675 output_dir: PathBuf::from("output1"),
676 success: true,
677 error: None,
678 },
679 TargetResult {
680 target_url: "http://api2.com".to_string(),
681 target_index: 1,
682 results: K6Results::default(),
683 output_dir: PathBuf::from("output2"),
684 success: false,
685 error: Some("Network error".to_string()),
686 },
687 ];
688
689 let metrics = AggregatedMetrics::from_results(&results);
690 assert_eq!(metrics.total_requests, 100);
692 assert_eq!(metrics.total_failed_requests, 5);
693 assert_eq!(metrics.avg_duration_ms, 100.0);
694 }
695
696 #[test]
697 fn test_aggregated_metrics_empty_results() {
698 let results = vec![];
699 let metrics = AggregatedMetrics::from_results(&results);
700 assert_eq!(metrics.total_requests, 0);
701 assert_eq!(metrics.total_failed_requests, 0);
702 assert_eq!(metrics.avg_duration_ms, 0.0);
703 assert_eq!(metrics.error_rate, 0.0);
704 }
705
706 #[test]
707 fn test_aggregated_metrics_error_rate_calculation() {
708 let results = vec![TargetResult {
709 target_url: "http://api1.com".to_string(),
710 target_index: 0,
711 results: K6Results {
712 total_requests: 1000,
713 failed_requests: 50,
714 avg_duration_ms: 100.0,
715 p95_duration_ms: 200.0,
716 p99_duration_ms: 300.0,
717 ..Default::default()
718 },
719 output_dir: PathBuf::from("output1"),
720 success: true,
721 error: None,
722 }];
723
724 let metrics = AggregatedMetrics::from_results(&results);
725 assert_eq!(metrics.error_rate, 5.0); }
727
728 #[test]
729 fn test_aggregated_metrics_p95_p99_calculation() {
730 let results = vec![
731 TargetResult {
732 target_url: "http://api1.com".to_string(),
733 target_index: 0,
734 results: K6Results {
735 total_requests: 100,
736 failed_requests: 0,
737 avg_duration_ms: 100.0,
738 p95_duration_ms: 150.0,
739 p99_duration_ms: 200.0,
740 ..Default::default()
741 },
742 output_dir: PathBuf::from("output1"),
743 success: true,
744 error: None,
745 },
746 TargetResult {
747 target_url: "http://api2.com".to_string(),
748 target_index: 1,
749 results: K6Results {
750 total_requests: 100,
751 failed_requests: 0,
752 avg_duration_ms: 200.0,
753 p95_duration_ms: 250.0,
754 p99_duration_ms: 300.0,
755 ..Default::default()
756 },
757 output_dir: PathBuf::from("output2"),
758 success: true,
759 error: None,
760 },
761 TargetResult {
762 target_url: "http://api3.com".to_string(),
763 target_index: 2,
764 results: K6Results {
765 total_requests: 100,
766 failed_requests: 0,
767 avg_duration_ms: 300.0,
768 p95_duration_ms: 350.0,
769 p99_duration_ms: 400.0,
770 ..Default::default()
771 },
772 output_dir: PathBuf::from("output3"),
773 success: true,
774 error: None,
775 },
776 ];
777
778 let metrics = AggregatedMetrics::from_results(&results);
779 assert_eq!(metrics.p95_duration_ms, 350.0);
782 assert_eq!(metrics.p99_duration_ms, 400.0);
783 }
784}