1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69 pub total_rps: f64,
71 pub avg_rps: f64,
73 pub total_vus_max: u32,
75}
76
77impl AggregatedMetrics {
78 fn from_results(results: &[TargetResult]) -> Self {
80 let mut total_requests = 0u64;
81 let mut total_failed_requests = 0u64;
82 let mut durations = Vec::new();
83 let mut p95_values = Vec::new();
84 let mut p99_values = Vec::new();
85 let mut total_rps = 0.0f64;
86 let mut total_vus_max = 0u32;
87 let mut successful_count = 0usize;
88
89 for result in results {
90 if result.success {
91 total_requests += result.results.total_requests;
92 total_failed_requests += result.results.failed_requests;
93 durations.push(result.results.avg_duration_ms);
94 p95_values.push(result.results.p95_duration_ms);
95 p99_values.push(result.results.p99_duration_ms);
96 total_rps += result.results.rps;
97 total_vus_max += result.results.vus_max;
98 successful_count += 1;
99 }
100 }
101
102 let avg_duration_ms = if !durations.is_empty() {
103 durations.iter().sum::<f64>() / durations.len() as f64
104 } else {
105 0.0
106 };
107
108 let p95_duration_ms = if !p95_values.is_empty() {
109 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
110 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
111 p95_values[index.min(p95_values.len() - 1)]
112 } else {
113 0.0
114 };
115
116 let p99_duration_ms = if !p99_values.is_empty() {
117 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
118 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
119 p99_values[index.min(p99_values.len() - 1)]
120 } else {
121 0.0
122 };
123
124 let error_rate = if total_requests > 0 {
125 (total_failed_requests as f64 / total_requests as f64) * 100.0
126 } else {
127 0.0
128 };
129
130 let avg_rps = if successful_count > 0 {
131 total_rps / successful_count as f64
132 } else {
133 0.0
134 };
135
136 Self {
137 total_requests,
138 total_failed_requests,
139 avg_duration_ms,
140 p95_duration_ms,
141 p99_duration_ms,
142 error_rate,
143 total_rps,
144 avg_rps,
145 total_vus_max,
146 }
147 }
148}
149
150pub struct ParallelExecutor {
152 base_command: BenchCommand,
154 targets: Vec<TargetConfig>,
156 max_concurrency: usize,
158 base_output: PathBuf,
160}
161
162impl ParallelExecutor {
163 pub fn new(
165 base_command: BenchCommand,
166 targets: Vec<TargetConfig>,
167 max_concurrency: usize,
168 ) -> Self {
169 let base_output = base_command.output.clone();
170 Self {
171 base_command,
172 targets,
173 max_concurrency,
174 base_output,
175 }
176 }
177
178 pub async fn execute_all(&self) -> Result<AggregatedResults> {
180 let total_targets = self.targets.len();
181 TerminalReporter::print_progress(&format!(
182 "Starting parallel execution for {} targets (max concurrency: {})",
183 total_targets, self.max_concurrency
184 ));
185
186 if !K6Executor::is_k6_installed() {
188 TerminalReporter::print_error("k6 is not installed");
189 TerminalReporter::print_warning(
190 "Install k6 from: https://k6.io/docs/get-started/installation/",
191 );
192 return Err(BenchError::K6NotFound);
193 }
194
195 TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
197 let merged_spec = self.base_command.load_and_merge_specs().await?;
198 let parser = SpecParser::from_spec(merged_spec);
199 TerminalReporter::print_success("Specification(s) loaded");
200
201 let operations = if let Some(filter) = &self.base_command.operations {
203 parser.filter_operations(filter)?
204 } else {
205 parser.get_operations()
206 };
207
208 if operations.is_empty() {
209 return Err(BenchError::Other("No operations found in spec".to_string()));
210 }
211
212 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
213
214 TerminalReporter::print_progress("Generating request templates...");
216 let templates: Vec<_> = operations
217 .iter()
218 .map(RequestGenerator::generate_template)
219 .collect::<Result<Vec<_>>>()?;
220 TerminalReporter::print_success("Request templates generated");
221
222 let mut per_target_data: HashMap<
224 PathBuf,
225 (Vec<crate::request_gen::RequestTemplate>, Option<String>),
226 > = HashMap::new();
227 {
228 let mut unique_specs: Vec<PathBuf> = Vec::new();
229 for t in &self.targets {
230 if let Some(spec_path) = &t.spec {
231 if !unique_specs.contains(spec_path) {
232 unique_specs.push(spec_path.clone());
233 }
234 }
235 }
236 for spec_path in &unique_specs {
237 TerminalReporter::print_progress(&format!(
238 "Loading per-target spec: {}",
239 spec_path.display()
240 ));
241 match SpecParser::from_file(spec_path).await {
242 Ok(target_parser) => {
243 let target_ops = if let Some(filter) = &self.base_command.operations {
244 match target_parser.filter_operations(filter) {
245 Ok(ops) => ops,
246 Err(e) => {
247 TerminalReporter::print_warning(&format!(
248 "Failed to filter operations from {}: {}. Using shared spec.",
249 spec_path.display(),
250 e
251 ));
252 continue;
253 }
254 }
255 } else {
256 target_parser.get_operations()
257 };
258 let target_templates: Vec<_> = match target_ops
259 .iter()
260 .map(RequestGenerator::generate_template)
261 .collect::<Result<Vec<_>>>()
262 {
263 Ok(t) => t,
264 Err(e) => {
265 TerminalReporter::print_warning(&format!(
266 "Failed to generate templates from {}: {}. Using shared spec.",
267 spec_path.display(),
268 e
269 ));
270 continue;
271 }
272 };
273 let target_base_path = if let Some(cli_bp) = &self.base_command.base_path {
274 if cli_bp.is_empty() {
275 None
276 } else {
277 Some(cli_bp.clone())
278 }
279 } else {
280 target_parser.get_base_path()
281 };
282 TerminalReporter::print_success(&format!(
283 "Loaded {} operations from {}",
284 target_templates.len(),
285 spec_path.display()
286 ));
287 per_target_data
288 .insert(spec_path.clone(), (target_templates, target_base_path));
289 }
290 Err(e) => {
291 TerminalReporter::print_warning(&format!(
292 "Failed to load per-target spec {}: {}. Targets using this spec will use the shared spec.",
293 spec_path.display(),
294 e
295 ));
296 }
297 }
298 }
299 }
300
301 let base_headers = self.base_command.parse_headers()?;
303
304 let base_path = self.resolve_base_path(&parser);
306 if let Some(ref bp) = base_path {
307 TerminalReporter::print_progress(&format!("Using base path: {}", bp));
308 }
309
310 let scenario = LoadScenario::from_str(&self.base_command.scenario)
312 .map_err(BenchError::InvalidScenario)?;
313
314 let duration_secs_val = BenchCommand::parse_duration(&self.base_command.duration)?;
315
316 let security_testing_enabled_val =
318 self.base_command.security_test || self.base_command.wafbench_dir.is_some();
319
320 let has_advanced_features = self.base_command.data_file.is_some()
322 || self.base_command.error_rate.is_some()
323 || self.base_command.security_test
324 || self.base_command.parallel_create.is_some()
325 || self.base_command.wafbench_dir.is_some();
326
327 let enhancement_code = if has_advanced_features {
328 let dummy_script = "export const options = {};";
329 let enhanced = self.base_command.generate_enhanced_script(dummy_script)?;
330 if let Some(pos) = enhanced.find("export const options") {
331 enhanced[..pos].to_string()
332 } else {
333 String::new()
334 }
335 } else {
336 String::new()
337 };
338
339 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
341 let multi_progress = MultiProgress::new();
342
343 let progress_bars: Vec<ProgressBar> = (0..total_targets)
345 .map(|i| {
346 let pb = multi_progress.add(ProgressBar::new(1));
347 pb.set_style(
348 ProgressStyle::default_bar()
349 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
350 .unwrap(),
351 );
352 pb.set_message(format!("Target {}", i + 1));
353 pb
354 })
355 .collect();
356
357 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
359
360 for (index, target) in self.targets.iter().enumerate() {
361 let target = target.clone();
362 let duration = self.base_command.duration.clone();
364 let vus = self.base_command.vus;
365 let scenario_str = self.base_command.scenario.clone();
366 let operations = self.base_command.operations.clone();
367 let auth = self.base_command.auth.clone();
368 let headers = self.base_command.headers.clone();
369 let threshold_percentile = self.base_command.threshold_percentile.clone();
370 let threshold_ms = self.base_command.threshold_ms;
371 let max_error_rate = self.base_command.max_error_rate;
372 let verbose = self.base_command.verbose;
373 let skip_tls_verify = self.base_command.skip_tls_verify;
374
375 let (templates, base_path) = if let Some(spec_path) = &target.spec {
377 if let Some((t, bp)) = per_target_data.get(spec_path) {
378 (t.clone(), bp.clone())
379 } else {
380 (templates.clone(), base_path.clone())
381 }
382 } else {
383 (templates.clone(), base_path.clone())
384 };
385
386 let base_headers = base_headers.clone();
387 let scenario = scenario.clone();
388 let duration_secs = duration_secs_val;
389 let base_output = self.base_output.clone();
390 let semaphore = semaphore.clone();
391 let progress_bar = progress_bars[index].clone();
392 let target_index = index;
393 let security_testing_enabled = security_testing_enabled_val;
394 let enhancement_code = enhancement_code.clone();
395
396 let handle = tokio::spawn(async move {
397 let _permit = semaphore.acquire().await.map_err(|e| {
399 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
400 })?;
401
402 progress_bar.set_message(format!("Testing {}", target.url));
403
404 let result = Self::execute_single_target_internal(
406 &duration,
407 vus,
408 &scenario_str,
409 &operations,
410 &auth,
411 &headers,
412 &threshold_percentile,
413 threshold_ms,
414 max_error_rate,
415 verbose,
416 skip_tls_verify,
417 base_path.as_ref(),
418 &target,
419 target_index,
420 &templates,
421 &base_headers,
422 &scenario,
423 duration_secs,
424 &base_output,
425 security_testing_enabled,
426 &enhancement_code,
427 )
428 .await;
429
430 progress_bar.inc(1);
431 progress_bar.finish_with_message(format!("Completed {}", target.url));
432
433 result
434 });
435
436 handles.push(handle);
437 }
438
439 let mut target_results = Vec::new();
441 for (index, handle) in handles.into_iter().enumerate() {
442 match handle.await {
443 Ok(Ok(result)) => {
444 target_results.push(result);
445 }
446 Ok(Err(e)) => {
447 let target_url = self.targets[index].url.clone();
449 target_results.push(TargetResult {
450 target_url: target_url.clone(),
451 target_index: index,
452 results: K6Results::default(),
453 output_dir: self.base_output.join(format!("target_{}", index + 1)),
454 success: false,
455 error: Some(e.to_string()),
456 });
457 }
458 Err(e) => {
459 let target_url = self.targets[index].url.clone();
461 target_results.push(TargetResult {
462 target_url: target_url.clone(),
463 target_index: index,
464 results: K6Results::default(),
465 output_dir: self.base_output.join(format!("target_{}", index + 1)),
466 success: false,
467 error: Some(format!("Task join error: {}", e)),
468 });
469 }
470 }
471 }
472
473 target_results.sort_by_key(|r| r.target_index);
475
476 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
478
479 let successful_targets = target_results.iter().filter(|r| r.success).count();
480 let failed_targets = total_targets - successful_targets;
481
482 Ok(AggregatedResults {
483 target_results,
484 total_targets,
485 successful_targets,
486 failed_targets,
487 aggregated_metrics,
488 })
489 }
490
491 fn resolve_base_path(&self, parser: &SpecParser) -> Option<String> {
493 if let Some(cli_base_path) = &self.base_command.base_path {
495 if cli_base_path.is_empty() {
496 return None;
497 }
498 return Some(cli_base_path.clone());
499 }
500 parser.get_base_path()
502 }
503
504 #[allow(clippy::too_many_arguments)]
506 async fn execute_single_target_internal(
507 _duration: &str,
508 vus: u32,
509 _scenario_str: &str,
510 _operations: &Option<String>,
511 auth: &Option<String>,
512 _headers: &Option<String>,
513 threshold_percentile: &str,
514 threshold_ms: u64,
515 max_error_rate: f64,
516 verbose: bool,
517 skip_tls_verify: bool,
518 base_path: Option<&String>,
519 target: &TargetConfig,
520 target_index: usize,
521 templates: &[crate::request_gen::RequestTemplate],
522 base_headers: &HashMap<String, String>,
523 scenario: &LoadScenario,
524 duration_secs: u64,
525 base_output: &Path,
526 security_testing_enabled: bool,
527 enhancement_code: &str,
528 ) -> Result<TargetResult> {
529 let mut custom_headers = base_headers.clone();
531 if let Some(target_headers) = &target.headers {
532 custom_headers.extend(target_headers.clone());
533 }
534
535 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
537
538 let k6_config = K6Config {
540 target_url: target.url.clone(),
541 base_path: base_path.cloned(),
542 scenario: scenario.clone(),
543 duration_secs,
544 max_vus: vus,
545 threshold_percentile: threshold_percentile.to_string(),
546 threshold_ms,
547 max_error_rate,
548 auth_header,
549 custom_headers,
550 skip_tls_verify,
551 security_testing_enabled,
552 };
553
554 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
556 let mut script = generator.generate()?;
557
558 if !enhancement_code.is_empty() {
560 if let Some(pos) = script.find("export const options") {
561 script.insert_str(pos, enhancement_code);
562 }
563 }
564
565 let validation_errors = K6ScriptGenerator::validate_script(&script);
567 if !validation_errors.is_empty() {
568 return Err(BenchError::Other(format!(
569 "Script validation failed for target {}: {}",
570 target.url,
571 validation_errors.join(", ")
572 )));
573 }
574
575 let output_dir = base_output.join(format!("target_{}", target_index + 1));
577 std::fs::create_dir_all(&output_dir)?;
578
579 let script_path = output_dir.join("k6-script.js");
581 std::fs::write(&script_path, script)?;
582
583 let api_port = 6565 + (target_index as u16) + 1; let executor = K6Executor::new()?;
587 let results = executor
588 .execute_with_port(&script_path, Some(&output_dir), verbose, Some(api_port))
589 .await;
590
591 match results {
592 Ok(k6_results) => Ok(TargetResult {
593 target_url: target.url.clone(),
594 target_index,
595 results: k6_results,
596 output_dir,
597 success: true,
598 error: None,
599 }),
600 Err(e) => Ok(TargetResult {
601 target_url: target.url.clone(),
602 target_index,
603 results: K6Results::default(),
604 output_dir,
605 success: false,
606 error: Some(e.to_string()),
607 }),
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615
616 #[test]
617 fn test_aggregated_metrics_from_results() {
618 let results = vec![
619 TargetResult {
620 target_url: "http://api1.com".to_string(),
621 target_index: 0,
622 results: K6Results {
623 total_requests: 100,
624 failed_requests: 5,
625 avg_duration_ms: 100.0,
626 p95_duration_ms: 200.0,
627 p99_duration_ms: 300.0,
628 ..Default::default()
629 },
630 output_dir: PathBuf::from("output1"),
631 success: true,
632 error: None,
633 },
634 TargetResult {
635 target_url: "http://api2.com".to_string(),
636 target_index: 1,
637 results: K6Results {
638 total_requests: 200,
639 failed_requests: 10,
640 avg_duration_ms: 150.0,
641 p95_duration_ms: 250.0,
642 p99_duration_ms: 350.0,
643 ..Default::default()
644 },
645 output_dir: PathBuf::from("output2"),
646 success: true,
647 error: None,
648 },
649 ];
650
651 let metrics = AggregatedMetrics::from_results(&results);
652 assert_eq!(metrics.total_requests, 300);
653 assert_eq!(metrics.total_failed_requests, 15);
654 assert_eq!(metrics.avg_duration_ms, 125.0); }
656
657 #[test]
658 fn test_aggregated_metrics_with_failed_targets() {
659 let results = vec![
660 TargetResult {
661 target_url: "http://api1.com".to_string(),
662 target_index: 0,
663 results: K6Results {
664 total_requests: 100,
665 failed_requests: 5,
666 avg_duration_ms: 100.0,
667 p95_duration_ms: 200.0,
668 p99_duration_ms: 300.0,
669 ..Default::default()
670 },
671 output_dir: PathBuf::from("output1"),
672 success: true,
673 error: None,
674 },
675 TargetResult {
676 target_url: "http://api2.com".to_string(),
677 target_index: 1,
678 results: K6Results::default(),
679 output_dir: PathBuf::from("output2"),
680 success: false,
681 error: Some("Network error".to_string()),
682 },
683 ];
684
685 let metrics = AggregatedMetrics::from_results(&results);
686 assert_eq!(metrics.total_requests, 100);
688 assert_eq!(metrics.total_failed_requests, 5);
689 assert_eq!(metrics.avg_duration_ms, 100.0);
690 }
691
692 #[test]
693 fn test_aggregated_metrics_empty_results() {
694 let results = vec![];
695 let metrics = AggregatedMetrics::from_results(&results);
696 assert_eq!(metrics.total_requests, 0);
697 assert_eq!(metrics.total_failed_requests, 0);
698 assert_eq!(metrics.avg_duration_ms, 0.0);
699 assert_eq!(metrics.error_rate, 0.0);
700 }
701
702 #[test]
703 fn test_aggregated_metrics_error_rate_calculation() {
704 let results = vec![TargetResult {
705 target_url: "http://api1.com".to_string(),
706 target_index: 0,
707 results: K6Results {
708 total_requests: 1000,
709 failed_requests: 50,
710 avg_duration_ms: 100.0,
711 p95_duration_ms: 200.0,
712 p99_duration_ms: 300.0,
713 ..Default::default()
714 },
715 output_dir: PathBuf::from("output1"),
716 success: true,
717 error: None,
718 }];
719
720 let metrics = AggregatedMetrics::from_results(&results);
721 assert_eq!(metrics.error_rate, 5.0); }
723
724 #[test]
725 fn test_aggregated_metrics_p95_p99_calculation() {
726 let results = vec![
727 TargetResult {
728 target_url: "http://api1.com".to_string(),
729 target_index: 0,
730 results: K6Results {
731 total_requests: 100,
732 failed_requests: 0,
733 avg_duration_ms: 100.0,
734 p95_duration_ms: 150.0,
735 p99_duration_ms: 200.0,
736 ..Default::default()
737 },
738 output_dir: PathBuf::from("output1"),
739 success: true,
740 error: None,
741 },
742 TargetResult {
743 target_url: "http://api2.com".to_string(),
744 target_index: 1,
745 results: K6Results {
746 total_requests: 100,
747 failed_requests: 0,
748 avg_duration_ms: 200.0,
749 p95_duration_ms: 250.0,
750 p99_duration_ms: 300.0,
751 ..Default::default()
752 },
753 output_dir: PathBuf::from("output2"),
754 success: true,
755 error: None,
756 },
757 TargetResult {
758 target_url: "http://api3.com".to_string(),
759 target_index: 2,
760 results: K6Results {
761 total_requests: 100,
762 failed_requests: 0,
763 avg_duration_ms: 300.0,
764 p95_duration_ms: 350.0,
765 p99_duration_ms: 400.0,
766 ..Default::default()
767 },
768 output_dir: PathBuf::from("output3"),
769 success: true,
770 error: None,
771 },
772 ];
773
774 let metrics = AggregatedMetrics::from_results(&results);
775 assert_eq!(metrics.p95_duration_ms, 350.0);
778 assert_eq!(metrics.p99_duration_ms, 400.0);
779 }
780}