mockforge_bench/
parallel_executor.rs

1//! Parallel execution engine for multi-target bench testing
2//!
3//! Executes load tests against multiple targets in parallel with configurable
4//! concurrency limits. Uses tokio for async execution and semaphores for
5//! backpressure control.
6
7use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24/// Result for a single target execution
25#[derive(Debug, Clone)]
26pub struct TargetResult {
27    /// Target URL that was tested
28    pub target_url: String,
29    /// Index of the target (for ordering)
30    pub target_index: usize,
31    /// k6 test results
32    pub results: K6Results,
33    /// Output directory for this target
34    pub output_dir: PathBuf,
35    /// Whether the test succeeded
36    pub success: bool,
37    /// Error message if test failed
38    pub error: Option<String>,
39}
40
41/// Aggregated results from all target executions
42#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44    /// Results for each target
45    pub target_results: Vec<TargetResult>,
46    /// Overall statistics
47    pub total_targets: usize,
48    pub successful_targets: usize,
49    pub failed_targets: usize,
50    /// Aggregated metrics across all targets
51    pub aggregated_metrics: AggregatedMetrics,
52}
53
54/// Aggregated metrics across all targets
55#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57    /// Total requests across all targets
58    pub total_requests: u64,
59    /// Total failed requests across all targets
60    pub total_failed_requests: u64,
61    /// Average response time across all targets (ms)
62    pub avg_duration_ms: f64,
63    /// p95 response time across all targets (ms)
64    pub p95_duration_ms: f64,
65    /// p99 response time across all targets (ms)
66    pub p99_duration_ms: f64,
67    /// Overall error rate percentage
68    pub error_rate: f64,
69}
70
71impl AggregatedMetrics {
72    /// Calculate aggregated metrics from target results
73    fn from_results(results: &[TargetResult]) -> Self {
74        let mut total_requests = 0u64;
75        let mut total_failed_requests = 0u64;
76        let mut durations = Vec::new();
77        let mut p95_values = Vec::new();
78        let mut p99_values = Vec::new();
79
80        for result in results {
81            if result.success {
82                total_requests += result.results.total_requests;
83                total_failed_requests += result.results.failed_requests;
84                durations.push(result.results.avg_duration_ms);
85                p95_values.push(result.results.p95_duration_ms);
86                p99_values.push(result.results.p99_duration_ms);
87            }
88        }
89
90        let avg_duration_ms = if !durations.is_empty() {
91            durations.iter().sum::<f64>() / durations.len() as f64
92        } else {
93            0.0
94        };
95
96        let p95_duration_ms = if !p95_values.is_empty() {
97            p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
98            let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
99            p95_values[index.min(p95_values.len() - 1)]
100        } else {
101            0.0
102        };
103
104        let p99_duration_ms = if !p99_values.is_empty() {
105            p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
106            let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
107            p99_values[index.min(p99_values.len() - 1)]
108        } else {
109            0.0
110        };
111
112        let error_rate = if total_requests > 0 {
113            (total_failed_requests as f64 / total_requests as f64) * 100.0
114        } else {
115            0.0
116        };
117
118        Self {
119            total_requests,
120            total_failed_requests,
121            avg_duration_ms,
122            p95_duration_ms,
123            p99_duration_ms,
124            error_rate,
125        }
126    }
127}
128
129/// Parallel executor for multi-target bench testing
130pub struct ParallelExecutor {
131    /// Base command configuration (shared across all targets)
132    base_command: BenchCommand,
133    /// List of targets to test
134    targets: Vec<TargetConfig>,
135    /// Maximum number of concurrent executions
136    max_concurrency: usize,
137    /// Base output directory
138    base_output: PathBuf,
139}
140
141impl ParallelExecutor {
142    /// Create a new parallel executor
143    pub fn new(
144        base_command: BenchCommand,
145        targets: Vec<TargetConfig>,
146        max_concurrency: usize,
147    ) -> Self {
148        let base_output = base_command.output.clone();
149        Self {
150            base_command,
151            targets,
152            max_concurrency,
153            base_output,
154        }
155    }
156
157    /// Execute tests against all targets in parallel
158    pub async fn execute_all(&self) -> Result<AggregatedResults> {
159        let total_targets = self.targets.len();
160        TerminalReporter::print_progress(&format!(
161            "Starting parallel execution for {} targets (max concurrency: {})",
162            total_targets, self.max_concurrency
163        ));
164
165        // Validate k6 installation
166        if !K6Executor::is_k6_installed() {
167            TerminalReporter::print_error("k6 is not installed");
168            TerminalReporter::print_warning(
169                "Install k6 from: https://k6.io/docs/get-started/installation/",
170            );
171            return Err(BenchError::K6NotFound);
172        }
173
174        // Load and parse spec (shared across all targets)
175        TerminalReporter::print_progress("Loading OpenAPI specification...");
176        let parser = SpecParser::from_file(&self.base_command.spec).await?;
177        TerminalReporter::print_success("Specification loaded");
178
179        // Get operations
180        let operations = if let Some(filter) = &self.base_command.operations {
181            parser.filter_operations(filter)?
182        } else {
183            parser.get_operations()
184        };
185
186        if operations.is_empty() {
187            return Err(BenchError::Other("No operations found in spec".to_string()));
188        }
189
190        TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
191
192        // Generate request templates (shared across all targets)
193        TerminalReporter::print_progress("Generating request templates...");
194        let templates: Vec<_> = operations
195            .iter()
196            .map(RequestGenerator::generate_template)
197            .collect::<Result<Vec<_>>>()?;
198        TerminalReporter::print_success("Request templates generated");
199
200        // Parse base headers
201        let base_headers = self.base_command.parse_headers()?;
202
203        // Parse scenario
204        let scenario = LoadScenario::from_str(&self.base_command.scenario)
205            .map_err(BenchError::InvalidScenario)?;
206
207        let duration_secs = BenchCommand::parse_duration(&self.base_command.duration)?;
208
209        // Create semaphore for concurrency control
210        let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
211        let multi_progress = MultiProgress::new();
212
213        // Create progress bars for each target
214        let progress_bars: Vec<ProgressBar> = (0..total_targets)
215            .map(|i| {
216                let pb = multi_progress.add(ProgressBar::new(1));
217                pb.set_style(
218                    ProgressStyle::default_bar()
219                        .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
220                        .unwrap(),
221                );
222                pb.set_message(format!("Target {}", i + 1));
223                pb
224            })
225            .collect();
226
227        // Spawn tasks for each target
228        let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
229
230        for (index, target) in self.targets.iter().enumerate() {
231            let target = target.clone();
232            // Clone necessary fields from base_command instead of passing reference
233            let spec = self.base_command.spec.clone();
234            let duration = self.base_command.duration.clone();
235            let vus = self.base_command.vus;
236            let scenario_str = self.base_command.scenario.clone();
237            let operations = self.base_command.operations.clone();
238            let auth = self.base_command.auth.clone();
239            let headers = self.base_command.headers.clone();
240            let threshold_percentile = self.base_command.threshold_percentile.clone();
241            let threshold_ms = self.base_command.threshold_ms;
242            let max_error_rate = self.base_command.max_error_rate;
243            let verbose = self.base_command.verbose;
244            let skip_tls_verify = self.base_command.skip_tls_verify;
245
246            let templates = templates.clone();
247            let base_headers = base_headers.clone();
248            let scenario = scenario.clone();
249            let duration_secs = duration_secs;
250            let base_output = self.base_output.clone();
251            let semaphore = semaphore.clone();
252            let progress_bar = progress_bars[index].clone();
253            let target_index = index;
254
255            let handle = tokio::spawn(async move {
256                // Acquire semaphore permit
257                let _permit = semaphore.acquire().await.map_err(|e| {
258                    BenchError::Other(format!("Failed to acquire semaphore: {}", e))
259                })?;
260
261                progress_bar.set_message(format!("Testing {}", target.url));
262
263                // Create a temporary BenchCommand for this target execution
264                // We only need it to call execute_single_target, so we'll pass individual fields
265                // Execute test for this target
266                let result = Self::execute_single_target_internal(
267                    &spec,
268                    &duration,
269                    vus,
270                    &scenario_str,
271                    &operations,
272                    &auth,
273                    &headers,
274                    &threshold_percentile,
275                    threshold_ms,
276                    max_error_rate,
277                    verbose,
278                    skip_tls_verify,
279                    &target,
280                    target_index,
281                    &templates,
282                    &base_headers,
283                    &scenario,
284                    duration_secs,
285                    &base_output,
286                )
287                .await;
288
289                progress_bar.inc(1);
290                progress_bar.finish_with_message(format!("Completed {}", target.url));
291
292                result
293            });
294
295            handles.push(handle);
296        }
297
298        // Wait for all tasks to complete and collect results
299        let mut target_results = Vec::new();
300        for (index, handle) in handles.into_iter().enumerate() {
301            match handle.await {
302                Ok(Ok(result)) => {
303                    target_results.push(result);
304                }
305                Ok(Err(e)) => {
306                    // Create error result
307                    let target_url = self.targets[index].url.clone();
308                    target_results.push(TargetResult {
309                        target_url: target_url.clone(),
310                        target_index: index,
311                        results: K6Results::default(),
312                        output_dir: self.base_output.join(format!("target_{}", index + 1)),
313                        success: false,
314                        error: Some(e.to_string()),
315                    });
316                }
317                Err(e) => {
318                    // Join error
319                    let target_url = self.targets[index].url.clone();
320                    target_results.push(TargetResult {
321                        target_url: target_url.clone(),
322                        target_index: index,
323                        results: K6Results::default(),
324                        output_dir: self.base_output.join(format!("target_{}", index + 1)),
325                        success: false,
326                        error: Some(format!("Task join error: {}", e)),
327                    });
328                }
329            }
330        }
331
332        // Sort results by target index
333        target_results.sort_by_key(|r| r.target_index);
334
335        // Calculate aggregated metrics
336        let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
337
338        let successful_targets = target_results.iter().filter(|r| r.success).count();
339        let failed_targets = total_targets - successful_targets;
340
341        Ok(AggregatedResults {
342            target_results,
343            total_targets,
344            successful_targets,
345            failed_targets,
346            aggregated_metrics,
347        })
348    }
349
350    /// Execute a single target test (internal method that doesn't require BenchCommand)
351    async fn execute_single_target_internal(
352        spec: &PathBuf,
353        duration: &str,
354        vus: u32,
355        scenario_str: &str,
356        operations: &Option<String>,
357        auth: &Option<String>,
358        headers: &Option<String>,
359        threshold_percentile: &str,
360        threshold_ms: u64,
361        max_error_rate: f64,
362        verbose: bool,
363        skip_tls_verify: bool,
364        target: &TargetConfig,
365        target_index: usize,
366        templates: &[crate::request_gen::RequestTemplate],
367        base_headers: &HashMap<String, String>,
368        scenario: &LoadScenario,
369        duration_secs: u64,
370        base_output: &Path,
371    ) -> Result<TargetResult> {
372        // Merge target-specific headers with base headers
373        let mut custom_headers = base_headers.clone();
374        if let Some(target_headers) = &target.headers {
375            custom_headers.extend(target_headers.clone());
376        }
377
378        // Use target-specific auth if provided, otherwise use base auth
379        let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
380
381        // Create k6 config for this target
382        let k6_config = K6Config {
383            target_url: target.url.clone(),
384            scenario: scenario.clone(),
385            duration_secs,
386            max_vus: vus,
387            threshold_percentile: threshold_percentile.to_string(),
388            threshold_ms,
389            max_error_rate,
390            auth_header,
391            custom_headers,
392            skip_tls_verify,
393        };
394
395        // Generate k6 script
396        let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
397        let script = generator.generate()?;
398
399        // Validate script
400        let validation_errors = K6ScriptGenerator::validate_script(&script);
401        if !validation_errors.is_empty() {
402            return Err(BenchError::Other(format!(
403                "Script validation failed for target {}: {}",
404                target.url,
405                validation_errors.join(", ")
406            )));
407        }
408
409        // Create output directory for this target
410        let output_dir = base_output.join(format!("target_{}", target_index + 1));
411        std::fs::create_dir_all(&output_dir)?;
412
413        // Write script to file
414        let script_path = output_dir.join("k6-script.js");
415        std::fs::write(&script_path, script)?;
416
417        // Execute k6
418        let executor = K6Executor::new()?;
419        let results = executor.execute(&script_path, Some(&output_dir), verbose).await;
420
421        match results {
422            Ok(k6_results) => Ok(TargetResult {
423                target_url: target.url.clone(),
424                target_index,
425                results: k6_results,
426                output_dir,
427                success: true,
428                error: None,
429            }),
430            Err(e) => Ok(TargetResult {
431                target_url: target.url.clone(),
432                target_index,
433                results: K6Results::default(),
434                output_dir,
435                success: false,
436                error: Some(e.to_string()),
437            }),
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    #[test]
447    fn test_aggregated_metrics_from_results() {
448        let results = vec![
449            TargetResult {
450                target_url: "http://api1.com".to_string(),
451                target_index: 0,
452                results: K6Results {
453                    total_requests: 100,
454                    failed_requests: 5,
455                    avg_duration_ms: 100.0,
456                    p95_duration_ms: 200.0,
457                    p99_duration_ms: 300.0,
458                },
459                output_dir: PathBuf::from("output1"),
460                success: true,
461                error: None,
462            },
463            TargetResult {
464                target_url: "http://api2.com".to_string(),
465                target_index: 1,
466                results: K6Results {
467                    total_requests: 200,
468                    failed_requests: 10,
469                    avg_duration_ms: 150.0,
470                    p95_duration_ms: 250.0,
471                    p99_duration_ms: 350.0,
472                },
473                output_dir: PathBuf::from("output2"),
474                success: true,
475                error: None,
476            },
477        ];
478
479        let metrics = AggregatedMetrics::from_results(&results);
480        assert_eq!(metrics.total_requests, 300);
481        assert_eq!(metrics.total_failed_requests, 15);
482        assert_eq!(metrics.avg_duration_ms, 125.0); // (100 + 150) / 2
483    }
484
485    #[test]
486    fn test_aggregated_metrics_with_failed_targets() {
487        let results = vec![
488            TargetResult {
489                target_url: "http://api1.com".to_string(),
490                target_index: 0,
491                results: K6Results {
492                    total_requests: 100,
493                    failed_requests: 5,
494                    avg_duration_ms: 100.0,
495                    p95_duration_ms: 200.0,
496                    p99_duration_ms: 300.0,
497                },
498                output_dir: PathBuf::from("output1"),
499                success: true,
500                error: None,
501            },
502            TargetResult {
503                target_url: "http://api2.com".to_string(),
504                target_index: 1,
505                results: K6Results::default(),
506                output_dir: PathBuf::from("output2"),
507                success: false,
508                error: Some("Network error".to_string()),
509            },
510        ];
511
512        let metrics = AggregatedMetrics::from_results(&results);
513        // Only successful target should be counted
514        assert_eq!(metrics.total_requests, 100);
515        assert_eq!(metrics.total_failed_requests, 5);
516        assert_eq!(metrics.avg_duration_ms, 100.0);
517    }
518
519    #[test]
520    fn test_aggregated_metrics_empty_results() {
521        let results = vec![];
522        let metrics = AggregatedMetrics::from_results(&results);
523        assert_eq!(metrics.total_requests, 0);
524        assert_eq!(metrics.total_failed_requests, 0);
525        assert_eq!(metrics.avg_duration_ms, 0.0);
526        assert_eq!(metrics.error_rate, 0.0);
527    }
528
529    #[test]
530    fn test_aggregated_metrics_error_rate_calculation() {
531        let results = vec![TargetResult {
532            target_url: "http://api1.com".to_string(),
533            target_index: 0,
534            results: K6Results {
535                total_requests: 1000,
536                failed_requests: 50,
537                avg_duration_ms: 100.0,
538                p95_duration_ms: 200.0,
539                p99_duration_ms: 300.0,
540            },
541            output_dir: PathBuf::from("output1"),
542            success: true,
543            error: None,
544        }];
545
546        let metrics = AggregatedMetrics::from_results(&results);
547        assert_eq!(metrics.error_rate, 5.0); // 50/1000 * 100
548    }
549
550    #[test]
551    fn test_aggregated_metrics_p95_p99_calculation() {
552        let results = vec![
553            TargetResult {
554                target_url: "http://api1.com".to_string(),
555                target_index: 0,
556                results: K6Results {
557                    total_requests: 100,
558                    failed_requests: 0,
559                    avg_duration_ms: 100.0,
560                    p95_duration_ms: 150.0,
561                    p99_duration_ms: 200.0,
562                },
563                output_dir: PathBuf::from("output1"),
564                success: true,
565                error: None,
566            },
567            TargetResult {
568                target_url: "http://api2.com".to_string(),
569                target_index: 1,
570                results: K6Results {
571                    total_requests: 100,
572                    failed_requests: 0,
573                    avg_duration_ms: 200.0,
574                    p95_duration_ms: 250.0,
575                    p99_duration_ms: 300.0,
576                },
577                output_dir: PathBuf::from("output2"),
578                success: true,
579                error: None,
580            },
581            TargetResult {
582                target_url: "http://api3.com".to_string(),
583                target_index: 2,
584                results: K6Results {
585                    total_requests: 100,
586                    failed_requests: 0,
587                    avg_duration_ms: 300.0,
588                    p95_duration_ms: 350.0,
589                    p99_duration_ms: 400.0,
590                },
591                output_dir: PathBuf::from("output3"),
592                success: true,
593                error: None,
594            },
595        ];
596
597        let metrics = AggregatedMetrics::from_results(&results);
598        // p95 should be the 95th percentile of [150, 250, 350] = index 2 = 350
599        // p99 should be the 99th percentile of [200, 300, 400] = index 2 = 400
600        assert_eq!(metrics.p95_duration_ms, 350.0);
601        assert_eq!(metrics.p99_duration_ms, 400.0);
602    }
603}