1use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24#[derive(Debug, Clone)]
26pub struct TargetResult {
27 pub target_url: String,
29 pub target_index: usize,
31 pub results: K6Results,
33 pub output_dir: PathBuf,
35 pub success: bool,
37 pub error: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44 pub target_results: Vec<TargetResult>,
46 pub total_targets: usize,
48 pub successful_targets: usize,
49 pub failed_targets: usize,
50 pub aggregated_metrics: AggregatedMetrics,
52}
53
54#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57 pub total_requests: u64,
59 pub total_failed_requests: u64,
61 pub avg_duration_ms: f64,
63 pub p95_duration_ms: f64,
65 pub p99_duration_ms: f64,
67 pub error_rate: f64,
69}
70
71impl AggregatedMetrics {
72 fn from_results(results: &[TargetResult]) -> Self {
74 let mut total_requests = 0u64;
75 let mut total_failed_requests = 0u64;
76 let mut durations = Vec::new();
77 let mut p95_values = Vec::new();
78 let mut p99_values = Vec::new();
79
80 for result in results {
81 if result.success {
82 total_requests += result.results.total_requests;
83 total_failed_requests += result.results.failed_requests;
84 durations.push(result.results.avg_duration_ms);
85 p95_values.push(result.results.p95_duration_ms);
86 p99_values.push(result.results.p99_duration_ms);
87 }
88 }
89
90 let avg_duration_ms = if !durations.is_empty() {
91 durations.iter().sum::<f64>() / durations.len() as f64
92 } else {
93 0.0
94 };
95
96 let p95_duration_ms = if !p95_values.is_empty() {
97 p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
98 let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
99 p95_values[index.min(p95_values.len() - 1)]
100 } else {
101 0.0
102 };
103
104 let p99_duration_ms = if !p99_values.is_empty() {
105 p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
106 let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
107 p99_values[index.min(p99_values.len() - 1)]
108 } else {
109 0.0
110 };
111
112 let error_rate = if total_requests > 0 {
113 (total_failed_requests as f64 / total_requests as f64) * 100.0
114 } else {
115 0.0
116 };
117
118 Self {
119 total_requests,
120 total_failed_requests,
121 avg_duration_ms,
122 p95_duration_ms,
123 p99_duration_ms,
124 error_rate,
125 }
126 }
127}
128
129pub struct ParallelExecutor {
131 base_command: BenchCommand,
133 targets: Vec<TargetConfig>,
135 max_concurrency: usize,
137 base_output: PathBuf,
139}
140
141impl ParallelExecutor {
142 pub fn new(
144 base_command: BenchCommand,
145 targets: Vec<TargetConfig>,
146 max_concurrency: usize,
147 ) -> Self {
148 let base_output = base_command.output.clone();
149 Self {
150 base_command,
151 targets,
152 max_concurrency,
153 base_output,
154 }
155 }
156
157 pub async fn execute_all(&self) -> Result<AggregatedResults> {
159 let total_targets = self.targets.len();
160 TerminalReporter::print_progress(&format!(
161 "Starting parallel execution for {} targets (max concurrency: {})",
162 total_targets, self.max_concurrency
163 ));
164
165 if !K6Executor::is_k6_installed() {
167 TerminalReporter::print_error("k6 is not installed");
168 TerminalReporter::print_warning(
169 "Install k6 from: https://k6.io/docs/get-started/installation/",
170 );
171 return Err(BenchError::K6NotFound);
172 }
173
174 TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
176 let merged_spec = self.base_command.load_and_merge_specs().await?;
177 let parser = SpecParser::from_spec(merged_spec);
178 TerminalReporter::print_success("Specification(s) loaded");
179
180 let operations = if let Some(filter) = &self.base_command.operations {
182 parser.filter_operations(filter)?
183 } else {
184 parser.get_operations()
185 };
186
187 if operations.is_empty() {
188 return Err(BenchError::Other("No operations found in spec".to_string()));
189 }
190
191 TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
192
193 TerminalReporter::print_progress("Generating request templates...");
195 let templates: Vec<_> = operations
196 .iter()
197 .map(RequestGenerator::generate_template)
198 .collect::<Result<Vec<_>>>()?;
199 TerminalReporter::print_success("Request templates generated");
200
201 let base_headers = self.base_command.parse_headers()?;
203
204 let scenario = LoadScenario::from_str(&self.base_command.scenario)
206 .map_err(BenchError::InvalidScenario)?;
207
208 let duration_secs = BenchCommand::parse_duration(&self.base_command.duration)?;
209
210 let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
212 let multi_progress = MultiProgress::new();
213
214 let progress_bars: Vec<ProgressBar> = (0..total_targets)
216 .map(|i| {
217 let pb = multi_progress.add(ProgressBar::new(1));
218 pb.set_style(
219 ProgressStyle::default_bar()
220 .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
221 .unwrap(),
222 );
223 pb.set_message(format!("Target {}", i + 1));
224 pb
225 })
226 .collect();
227
228 let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
230
231 for (index, target) in self.targets.iter().enumerate() {
232 let target = target.clone();
233 let duration = self.base_command.duration.clone();
235 let vus = self.base_command.vus;
236 let scenario_str = self.base_command.scenario.clone();
237 let operations = self.base_command.operations.clone();
238 let auth = self.base_command.auth.clone();
239 let headers = self.base_command.headers.clone();
240 let threshold_percentile = self.base_command.threshold_percentile.clone();
241 let threshold_ms = self.base_command.threshold_ms;
242 let max_error_rate = self.base_command.max_error_rate;
243 let verbose = self.base_command.verbose;
244 let skip_tls_verify = self.base_command.skip_tls_verify;
245
246 let templates = templates.clone();
247 let base_headers = base_headers.clone();
248 let scenario = scenario.clone();
249 let duration_secs = duration_secs;
250 let base_output = self.base_output.clone();
251 let semaphore = semaphore.clone();
252 let progress_bar = progress_bars[index].clone();
253 let target_index = index;
254
255 let handle = tokio::spawn(async move {
256 let _permit = semaphore.acquire().await.map_err(|e| {
258 BenchError::Other(format!("Failed to acquire semaphore: {}", e))
259 })?;
260
261 progress_bar.set_message(format!("Testing {}", target.url));
262
263 let result = Self::execute_single_target_internal(
267 &duration,
268 vus,
269 &scenario_str,
270 &operations,
271 &auth,
272 &headers,
273 &threshold_percentile,
274 threshold_ms,
275 max_error_rate,
276 verbose,
277 skip_tls_verify,
278 &target,
279 target_index,
280 &templates,
281 &base_headers,
282 &scenario,
283 duration_secs,
284 &base_output,
285 )
286 .await;
287
288 progress_bar.inc(1);
289 progress_bar.finish_with_message(format!("Completed {}", target.url));
290
291 result
292 });
293
294 handles.push(handle);
295 }
296
297 let mut target_results = Vec::new();
299 for (index, handle) in handles.into_iter().enumerate() {
300 match handle.await {
301 Ok(Ok(result)) => {
302 target_results.push(result);
303 }
304 Ok(Err(e)) => {
305 let target_url = self.targets[index].url.clone();
307 target_results.push(TargetResult {
308 target_url: target_url.clone(),
309 target_index: index,
310 results: K6Results::default(),
311 output_dir: self.base_output.join(format!("target_{}", index + 1)),
312 success: false,
313 error: Some(e.to_string()),
314 });
315 }
316 Err(e) => {
317 let target_url = self.targets[index].url.clone();
319 target_results.push(TargetResult {
320 target_url: target_url.clone(),
321 target_index: index,
322 results: K6Results::default(),
323 output_dir: self.base_output.join(format!("target_{}", index + 1)),
324 success: false,
325 error: Some(format!("Task join error: {}", e)),
326 });
327 }
328 }
329 }
330
331 target_results.sort_by_key(|r| r.target_index);
333
334 let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
336
337 let successful_targets = target_results.iter().filter(|r| r.success).count();
338 let failed_targets = total_targets - successful_targets;
339
340 Ok(AggregatedResults {
341 target_results,
342 total_targets,
343 successful_targets,
344 failed_targets,
345 aggregated_metrics,
346 })
347 }
348
349 #[allow(clippy::too_many_arguments)]
351 async fn execute_single_target_internal(
352 duration: &str,
353 vus: u32,
354 scenario_str: &str,
355 operations: &Option<String>,
356 auth: &Option<String>,
357 headers: &Option<String>,
358 threshold_percentile: &str,
359 threshold_ms: u64,
360 max_error_rate: f64,
361 verbose: bool,
362 skip_tls_verify: bool,
363 target: &TargetConfig,
364 target_index: usize,
365 templates: &[crate::request_gen::RequestTemplate],
366 base_headers: &HashMap<String, String>,
367 scenario: &LoadScenario,
368 duration_secs: u64,
369 base_output: &Path,
370 ) -> Result<TargetResult> {
371 let mut custom_headers = base_headers.clone();
373 if let Some(target_headers) = &target.headers {
374 custom_headers.extend(target_headers.clone());
375 }
376
377 let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
379
380 let k6_config = K6Config {
382 target_url: target.url.clone(),
383 scenario: scenario.clone(),
384 duration_secs,
385 max_vus: vus,
386 threshold_percentile: threshold_percentile.to_string(),
387 threshold_ms,
388 max_error_rate,
389 auth_header,
390 custom_headers,
391 skip_tls_verify,
392 };
393
394 let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
396 let script = generator.generate()?;
397
398 let validation_errors = K6ScriptGenerator::validate_script(&script);
400 if !validation_errors.is_empty() {
401 return Err(BenchError::Other(format!(
402 "Script validation failed for target {}: {}",
403 target.url,
404 validation_errors.join(", ")
405 )));
406 }
407
408 let output_dir = base_output.join(format!("target_{}", target_index + 1));
410 std::fs::create_dir_all(&output_dir)?;
411
412 let script_path = output_dir.join("k6-script.js");
414 std::fs::write(&script_path, script)?;
415
416 let executor = K6Executor::new()?;
418 let results = executor.execute(&script_path, Some(&output_dir), verbose).await;
419
420 match results {
421 Ok(k6_results) => Ok(TargetResult {
422 target_url: target.url.clone(),
423 target_index,
424 results: k6_results,
425 output_dir,
426 success: true,
427 error: None,
428 }),
429 Err(e) => Ok(TargetResult {
430 target_url: target.url.clone(),
431 target_index,
432 results: K6Results::default(),
433 output_dir,
434 success: false,
435 error: Some(e.to_string()),
436 }),
437 }
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444
445 #[test]
446 fn test_aggregated_metrics_from_results() {
447 let results = vec![
448 TargetResult {
449 target_url: "http://api1.com".to_string(),
450 target_index: 0,
451 results: K6Results {
452 total_requests: 100,
453 failed_requests: 5,
454 avg_duration_ms: 100.0,
455 p95_duration_ms: 200.0,
456 p99_duration_ms: 300.0,
457 },
458 output_dir: PathBuf::from("output1"),
459 success: true,
460 error: None,
461 },
462 TargetResult {
463 target_url: "http://api2.com".to_string(),
464 target_index: 1,
465 results: K6Results {
466 total_requests: 200,
467 failed_requests: 10,
468 avg_duration_ms: 150.0,
469 p95_duration_ms: 250.0,
470 p99_duration_ms: 350.0,
471 },
472 output_dir: PathBuf::from("output2"),
473 success: true,
474 error: None,
475 },
476 ];
477
478 let metrics = AggregatedMetrics::from_results(&results);
479 assert_eq!(metrics.total_requests, 300);
480 assert_eq!(metrics.total_failed_requests, 15);
481 assert_eq!(metrics.avg_duration_ms, 125.0); }
483
484 #[test]
485 fn test_aggregated_metrics_with_failed_targets() {
486 let results = vec![
487 TargetResult {
488 target_url: "http://api1.com".to_string(),
489 target_index: 0,
490 results: K6Results {
491 total_requests: 100,
492 failed_requests: 5,
493 avg_duration_ms: 100.0,
494 p95_duration_ms: 200.0,
495 p99_duration_ms: 300.0,
496 },
497 output_dir: PathBuf::from("output1"),
498 success: true,
499 error: None,
500 },
501 TargetResult {
502 target_url: "http://api2.com".to_string(),
503 target_index: 1,
504 results: K6Results::default(),
505 output_dir: PathBuf::from("output2"),
506 success: false,
507 error: Some("Network error".to_string()),
508 },
509 ];
510
511 let metrics = AggregatedMetrics::from_results(&results);
512 assert_eq!(metrics.total_requests, 100);
514 assert_eq!(metrics.total_failed_requests, 5);
515 assert_eq!(metrics.avg_duration_ms, 100.0);
516 }
517
518 #[test]
519 fn test_aggregated_metrics_empty_results() {
520 let results = vec![];
521 let metrics = AggregatedMetrics::from_results(&results);
522 assert_eq!(metrics.total_requests, 0);
523 assert_eq!(metrics.total_failed_requests, 0);
524 assert_eq!(metrics.avg_duration_ms, 0.0);
525 assert_eq!(metrics.error_rate, 0.0);
526 }
527
528 #[test]
529 fn test_aggregated_metrics_error_rate_calculation() {
530 let results = vec![TargetResult {
531 target_url: "http://api1.com".to_string(),
532 target_index: 0,
533 results: K6Results {
534 total_requests: 1000,
535 failed_requests: 50,
536 avg_duration_ms: 100.0,
537 p95_duration_ms: 200.0,
538 p99_duration_ms: 300.0,
539 },
540 output_dir: PathBuf::from("output1"),
541 success: true,
542 error: None,
543 }];
544
545 let metrics = AggregatedMetrics::from_results(&results);
546 assert_eq!(metrics.error_rate, 5.0); }
548
549 #[test]
550 fn test_aggregated_metrics_p95_p99_calculation() {
551 let results = vec![
552 TargetResult {
553 target_url: "http://api1.com".to_string(),
554 target_index: 0,
555 results: K6Results {
556 total_requests: 100,
557 failed_requests: 0,
558 avg_duration_ms: 100.0,
559 p95_duration_ms: 150.0,
560 p99_duration_ms: 200.0,
561 },
562 output_dir: PathBuf::from("output1"),
563 success: true,
564 error: None,
565 },
566 TargetResult {
567 target_url: "http://api2.com".to_string(),
568 target_index: 1,
569 results: K6Results {
570 total_requests: 100,
571 failed_requests: 0,
572 avg_duration_ms: 200.0,
573 p95_duration_ms: 250.0,
574 p99_duration_ms: 300.0,
575 },
576 output_dir: PathBuf::from("output2"),
577 success: true,
578 error: None,
579 },
580 TargetResult {
581 target_url: "http://api3.com".to_string(),
582 target_index: 2,
583 results: K6Results {
584 total_requests: 100,
585 failed_requests: 0,
586 avg_duration_ms: 300.0,
587 p95_duration_ms: 350.0,
588 p99_duration_ms: 400.0,
589 },
590 output_dir: PathBuf::from("output3"),
591 success: true,
592 error: None,
593 },
594 ];
595
596 let metrics = AggregatedMetrics::from_results(&results);
597 assert_eq!(metrics.p95_duration_ms, 350.0);
600 assert_eq!(metrics.p99_duration_ms, 400.0);
601 }
602}