Skip to main content

mockforge_bench/
parallel_executor.rs

1//! Parallel execution engine for multi-target bench testing
2//!
3//! Executes load tests against multiple targets in parallel with configurable
4//! concurrency limits. Uses tokio for async execution and semaphores for
5//! backpressure control.
6
7use crate::command::BenchCommand;
8use crate::error::{BenchError, Result};
9use crate::executor::{K6Executor, K6Results};
10use crate::k6_gen::{K6Config, K6ScriptGenerator};
11use crate::reporter::TerminalReporter;
12use crate::request_gen::RequestGenerator;
13use crate::scenarios::LoadScenario;
14use crate::spec_parser::SpecParser;
15use crate::target_parser::TargetConfig;
16use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use tokio::sync::Semaphore;
22use tokio::task::JoinHandle;
23
24/// Result for a single target execution
25#[derive(Debug, Clone)]
26pub struct TargetResult {
27    /// Target URL that was tested
28    pub target_url: String,
29    /// Index of the target (for ordering)
30    pub target_index: usize,
31    /// k6 test results
32    pub results: K6Results,
33    /// Output directory for this target
34    pub output_dir: PathBuf,
35    /// Whether the test succeeded
36    pub success: bool,
37    /// Error message if test failed
38    pub error: Option<String>,
39}
40
41/// Aggregated results from all target executions
42#[derive(Debug, Clone)]
43pub struct AggregatedResults {
44    /// Results for each target
45    pub target_results: Vec<TargetResult>,
46    /// Overall statistics
47    pub total_targets: usize,
48    pub successful_targets: usize,
49    pub failed_targets: usize,
50    /// Aggregated metrics across all targets
51    pub aggregated_metrics: AggregatedMetrics,
52}
53
54/// Aggregated metrics across all targets
55#[derive(Debug, Clone)]
56pub struct AggregatedMetrics {
57    /// Total requests across all targets
58    pub total_requests: u64,
59    /// Total failed requests across all targets
60    pub total_failed_requests: u64,
61    /// Average response time across all targets (ms)
62    pub avg_duration_ms: f64,
63    /// p95 response time across all targets (ms)
64    pub p95_duration_ms: f64,
65    /// p99 response time across all targets (ms)
66    pub p99_duration_ms: f64,
67    /// Overall error rate percentage
68    pub error_rate: f64,
69    /// Total RPS across all targets
70    pub total_rps: f64,
71    /// Average RPS per target
72    pub avg_rps: f64,
73    /// Total max VUs across all targets
74    pub total_vus_max: u32,
75    /// Total connections opened across all targets (Issue #79 round 12 —
76    /// Srikanth's multi-target bench output was missing the CPS / connection
77    /// counts that single-target runs surface).
78    pub total_connections_opened: u64,
79    /// Total iterations completed across all targets.
80    pub total_iterations_completed: u64,
81}
82
83impl AggregatedMetrics {
84    /// Calculate aggregated metrics from target results
85    fn from_results(results: &[TargetResult]) -> Self {
86        let mut total_requests = 0u64;
87        let mut total_failed_requests = 0u64;
88        let mut durations = Vec::new();
89        let mut p95_values = Vec::new();
90        let mut p99_values = Vec::new();
91        let mut total_rps = 0.0f64;
92        let mut total_vus_max = 0u32;
93        let mut total_connections_opened = 0u64;
94        let mut total_iterations_completed = 0u64;
95        let mut successful_count = 0usize;
96
97        for result in results {
98            if result.success {
99                total_requests += result.results.total_requests;
100                total_failed_requests += result.results.failed_requests;
101                durations.push(result.results.avg_duration_ms);
102                p95_values.push(result.results.p95_duration_ms);
103                p99_values.push(result.results.p99_duration_ms);
104                total_rps += result.results.rps;
105                total_vus_max += result.results.vus_max;
106                total_connections_opened += result.results.tcp_connect_samples;
107                total_iterations_completed += result.results.iterations_completed;
108                successful_count += 1;
109            }
110        }
111
112        let avg_duration_ms = if !durations.is_empty() {
113            durations.iter().sum::<f64>() / durations.len() as f64
114        } else {
115            0.0
116        };
117
118        let p95_duration_ms = if !p95_values.is_empty() {
119            p95_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
120            let index = (p95_values.len() as f64 * 0.95).ceil() as usize - 1;
121            p95_values[index.min(p95_values.len() - 1)]
122        } else {
123            0.0
124        };
125
126        let p99_duration_ms = if !p99_values.is_empty() {
127            p99_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
128            let index = (p99_values.len() as f64 * 0.99).ceil() as usize - 1;
129            p99_values[index.min(p99_values.len() - 1)]
130        } else {
131            0.0
132        };
133
134        let error_rate = if total_requests > 0 {
135            (total_failed_requests as f64 / total_requests as f64) * 100.0
136        } else {
137            0.0
138        };
139
140        let avg_rps = if successful_count > 0 {
141            total_rps / successful_count as f64
142        } else {
143            0.0
144        };
145
146        Self {
147            total_requests,
148            total_failed_requests,
149            avg_duration_ms,
150            p95_duration_ms,
151            p99_duration_ms,
152            error_rate,
153            total_rps,
154            avg_rps,
155            total_vus_max,
156            total_connections_opened,
157            total_iterations_completed,
158        }
159    }
160}
161
162/// Parallel executor for multi-target bench testing
163pub struct ParallelExecutor {
164    /// Base command configuration (shared across all targets)
165    base_command: BenchCommand,
166    /// List of targets to test
167    targets: Vec<TargetConfig>,
168    /// Maximum number of concurrent executions
169    max_concurrency: usize,
170    /// Base output directory
171    base_output: PathBuf,
172}
173
174impl ParallelExecutor {
175    /// Create a new parallel executor
176    pub fn new(
177        base_command: BenchCommand,
178        targets: Vec<TargetConfig>,
179        max_concurrency: usize,
180    ) -> Self {
181        let base_output = base_command.output.clone();
182        Self {
183            base_command,
184            targets,
185            max_concurrency,
186            base_output,
187        }
188    }
189
190    /// Execute tests against all targets in parallel
191    pub async fn execute_all(&self) -> Result<AggregatedResults> {
192        let total_targets = self.targets.len();
193        TerminalReporter::print_progress(&format!(
194            "Starting parallel execution for {} targets (max concurrency: {})",
195            total_targets, self.max_concurrency
196        ));
197
198        // Validate k6 installation
199        if !K6Executor::is_k6_installed() {
200            TerminalReporter::print_error("k6 is not installed");
201            TerminalReporter::print_warning(
202                "Install k6 from: https://k6.io/docs/get-started/installation/",
203            );
204            return Err(BenchError::K6NotFound);
205        }
206
207        // Load and parse spec(s) (shared across all targets)
208        TerminalReporter::print_progress("Loading OpenAPI specification(s)...");
209        let merged_spec = self.base_command.load_and_merge_specs().await?;
210        let parser = SpecParser::from_spec(merged_spec);
211        TerminalReporter::print_success("Specification(s) loaded");
212
213        // Get operations
214        let operations = if let Some(filter) = &self.base_command.operations {
215            parser.filter_operations(filter)?
216        } else {
217            parser.get_operations()
218        };
219
220        if operations.is_empty() {
221            return Err(BenchError::Other("No operations found in spec".to_string()));
222        }
223
224        TerminalReporter::print_success(&format!("Found {} operations", operations.len()));
225
226        // Generate request templates (shared across all targets)
227        TerminalReporter::print_progress("Generating request templates...");
228        let templates: Vec<_> = operations
229            .iter()
230            .map(RequestGenerator::generate_template)
231            .collect::<Result<Vec<_>>>()?;
232        TerminalReporter::print_success("Request templates generated");
233
234        // Pre-load per-target specs
235        let mut per_target_data: HashMap<
236            PathBuf,
237            (Vec<crate::request_gen::RequestTemplate>, Option<String>),
238        > = HashMap::new();
239        {
240            let mut unique_specs: Vec<PathBuf> = Vec::new();
241            for t in &self.targets {
242                if let Some(spec_path) = &t.spec {
243                    if !unique_specs.contains(spec_path) {
244                        unique_specs.push(spec_path.clone());
245                    }
246                }
247            }
248            for spec_path in &unique_specs {
249                TerminalReporter::print_progress(&format!(
250                    "Loading per-target spec: {}",
251                    spec_path.display()
252                ));
253                match SpecParser::from_file(spec_path).await {
254                    Ok(target_parser) => {
255                        let target_ops = if let Some(filter) = &self.base_command.operations {
256                            match target_parser.filter_operations(filter) {
257                                Ok(ops) => ops,
258                                Err(e) => {
259                                    TerminalReporter::print_warning(&format!(
260                                        "Failed to filter operations from {}: {}. Using shared spec.",
261                                        spec_path.display(),
262                                        e
263                                    ));
264                                    continue;
265                                }
266                            }
267                        } else {
268                            target_parser.get_operations()
269                        };
270                        let target_templates: Vec<_> = match target_ops
271                            .iter()
272                            .map(RequestGenerator::generate_template)
273                            .collect::<Result<Vec<_>>>()
274                        {
275                            Ok(t) => t,
276                            Err(e) => {
277                                TerminalReporter::print_warning(&format!(
278                                    "Failed to generate templates from {}: {}. Using shared spec.",
279                                    spec_path.display(),
280                                    e
281                                ));
282                                continue;
283                            }
284                        };
285                        let target_base_path = if let Some(cli_bp) = &self.base_command.base_path {
286                            if cli_bp.is_empty() {
287                                None
288                            } else {
289                                Some(cli_bp.clone())
290                            }
291                        } else {
292                            target_parser.get_base_path()
293                        };
294                        TerminalReporter::print_success(&format!(
295                            "Loaded {} operations from {}",
296                            target_templates.len(),
297                            spec_path.display()
298                        ));
299                        per_target_data
300                            .insert(spec_path.clone(), (target_templates, target_base_path));
301                    }
302                    Err(e) => {
303                        TerminalReporter::print_warning(&format!(
304                            "Failed to load per-target spec {}: {}. Targets using this spec will use the shared spec.",
305                            spec_path.display(),
306                            e
307                        ));
308                    }
309                }
310            }
311        }
312
313        // Parse base headers
314        let base_headers = self.base_command.parse_headers()?;
315
316        // Resolve base path (CLI option takes priority over spec's servers URL)
317        let base_path = self.resolve_base_path(&parser);
318        if let Some(ref bp) = base_path {
319            TerminalReporter::print_progress(&format!("Using base path: {}", bp));
320        }
321
322        // Parse scenario
323        let scenario = LoadScenario::from_str(&self.base_command.scenario)
324            .map_err(BenchError::InvalidScenario)?;
325
326        let duration_secs_val = BenchCommand::parse_duration(&self.base_command.duration)?;
327
328        // Compute security testing flag
329        let security_testing_enabled_val =
330            self.base_command.security_test || self.base_command.wafbench_dir.is_some();
331
332        // Pre-compute enhancement code once (same for all targets)
333        let has_advanced_features = self.base_command.data_file.is_some()
334            || self.base_command.error_rate.is_some()
335            || self.base_command.security_test
336            || self.base_command.parallel_create.is_some()
337            || self.base_command.wafbench_dir.is_some();
338
339        let enhancement_code = if has_advanced_features {
340            let dummy_script = "export const options = {};";
341            let enhanced = self.base_command.generate_enhanced_script(dummy_script)?;
342            if let Some(pos) = enhanced.find("export const options") {
343                enhanced[..pos].to_string()
344            } else {
345                String::new()
346            }
347        } else {
348            String::new()
349        };
350
351        // Create semaphore for concurrency control
352        let semaphore = Arc::new(Semaphore::new(self.max_concurrency));
353        let multi_progress = MultiProgress::new();
354
355        // Create progress bars for each target
356        let progress_bars: Vec<ProgressBar> = (0..total_targets)
357            .map(|i| {
358                let pb = multi_progress.add(ProgressBar::new(1));
359                pb.set_style(
360                    ProgressStyle::default_bar()
361                        .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} {msg}")
362                        .unwrap(),
363                );
364                pb.set_message(format!("Target {}", i + 1));
365                pb
366            })
367            .collect();
368
369        // Spawn tasks for each target
370        let mut handles: Vec<JoinHandle<Result<TargetResult>>> = Vec::new();
371
372        for (index, target) in self.targets.iter().enumerate() {
373            let target = target.clone();
374            // Clone necessary fields from base_command instead of passing reference
375            let duration = self.base_command.duration.clone();
376            let vus = self.base_command.vus;
377            let scenario_str = self.base_command.scenario.clone();
378            let operations = self.base_command.operations.clone();
379            let auth = self.base_command.auth.clone();
380            let headers = self.base_command.headers.clone();
381            let threshold_percentile = self.base_command.threshold_percentile.clone();
382            let threshold_ms = self.base_command.threshold_ms;
383            let max_error_rate = self.base_command.max_error_rate;
384            let verbose = self.base_command.verbose;
385            let skip_tls_verify = self.base_command.skip_tls_verify;
386            let chunked_request_bodies = self.base_command.chunked_request_bodies;
387            let target_rps = self.base_command.target_rps;
388            let no_keep_alive = self.base_command.no_keep_alive;
389
390            // Select per-target templates/base_path if this target has a custom spec
391            let (templates, base_path) = if let Some(spec_path) = &target.spec {
392                if let Some((t, bp)) = per_target_data.get(spec_path) {
393                    (t.clone(), bp.clone())
394                } else {
395                    (templates.clone(), base_path.clone())
396                }
397            } else {
398                (templates.clone(), base_path.clone())
399            };
400
401            let base_headers = base_headers.clone();
402            let scenario = scenario.clone();
403            let duration_secs = duration_secs_val;
404            let base_output = self.base_output.clone();
405            let semaphore = semaphore.clone();
406            let progress_bar = progress_bars[index].clone();
407            let target_index = index;
408            let security_testing_enabled = security_testing_enabled_val;
409            let enhancement_code = enhancement_code.clone();
410
411            let handle = tokio::spawn(async move {
412                // Acquire semaphore permit
413                let _permit = semaphore.acquire().await.map_err(|e| {
414                    BenchError::Other(format!("Failed to acquire semaphore: {}", e))
415                })?;
416
417                progress_bar.set_message(format!("Testing {}", target.url));
418
419                // Execute test for this target
420                let result = Self::execute_single_target_internal(
421                    &duration,
422                    vus,
423                    &scenario_str,
424                    &operations,
425                    &auth,
426                    &headers,
427                    &threshold_percentile,
428                    threshold_ms,
429                    max_error_rate,
430                    verbose,
431                    skip_tls_verify,
432                    base_path.as_ref(),
433                    &target,
434                    target_index,
435                    &templates,
436                    &base_headers,
437                    &scenario,
438                    duration_secs,
439                    &base_output,
440                    security_testing_enabled,
441                    chunked_request_bodies,
442                    target_rps,
443                    no_keep_alive,
444                    &enhancement_code,
445                )
446                .await;
447
448                progress_bar.inc(1);
449                progress_bar.finish_with_message(format!("Completed {}", target.url));
450
451                result
452            });
453
454            handles.push(handle);
455        }
456
457        // Wait for all tasks to complete and collect results
458        let mut target_results = Vec::new();
459        for (index, handle) in handles.into_iter().enumerate() {
460            match handle.await {
461                Ok(Ok(result)) => {
462                    target_results.push(result);
463                }
464                Ok(Err(e)) => {
465                    // Create error result
466                    let target_url = self.targets[index].url.clone();
467                    target_results.push(TargetResult {
468                        target_url: target_url.clone(),
469                        target_index: index,
470                        results: K6Results::default(),
471                        output_dir: self.base_output.join(format!("target_{}", index + 1)),
472                        success: false,
473                        error: Some(e.to_string()),
474                    });
475                }
476                Err(e) => {
477                    // Join error
478                    let target_url = self.targets[index].url.clone();
479                    target_results.push(TargetResult {
480                        target_url: target_url.clone(),
481                        target_index: index,
482                        results: K6Results::default(),
483                        output_dir: self.base_output.join(format!("target_{}", index + 1)),
484                        success: false,
485                        error: Some(format!("Task join error: {}", e)),
486                    });
487                }
488            }
489        }
490
491        // Sort results by target index
492        target_results.sort_by_key(|r| r.target_index);
493
494        // Calculate aggregated metrics
495        let aggregated_metrics = AggregatedMetrics::from_results(&target_results);
496
497        let successful_targets = target_results.iter().filter(|r| r.success).count();
498        let failed_targets = total_targets - successful_targets;
499
500        Ok(AggregatedResults {
501            target_results,
502            total_targets,
503            successful_targets,
504            failed_targets,
505            aggregated_metrics,
506        })
507    }
508
509    /// Resolve the effective base path for API endpoints
510    fn resolve_base_path(&self, parser: &SpecParser) -> Option<String> {
511        // CLI option takes priority (including empty string to disable)
512        if let Some(cli_base_path) = &self.base_command.base_path {
513            if cli_base_path.is_empty() {
514                return None;
515            }
516            return Some(cli_base_path.clone());
517        }
518        // Fall back to spec's base path
519        parser.get_base_path()
520    }
521
522    /// Execute a single target test (internal method that doesn't require BenchCommand)
523    #[allow(clippy::too_many_arguments)]
524    async fn execute_single_target_internal(
525        _duration: &str,
526        vus: u32,
527        _scenario_str: &str,
528        _operations: &Option<String>,
529        auth: &Option<String>,
530        _headers: &Option<String>,
531        threshold_percentile: &str,
532        threshold_ms: u64,
533        max_error_rate: f64,
534        verbose: bool,
535        skip_tls_verify: bool,
536        base_path: Option<&String>,
537        target: &TargetConfig,
538        target_index: usize,
539        templates: &[crate::request_gen::RequestTemplate],
540        base_headers: &HashMap<String, String>,
541        scenario: &LoadScenario,
542        duration_secs: u64,
543        base_output: &Path,
544        security_testing_enabled: bool,
545        chunked_request_bodies: bool,
546        target_rps: Option<u32>,
547        no_keep_alive: bool,
548        enhancement_code: &str,
549    ) -> Result<TargetResult> {
550        // Merge target-specific headers with base headers
551        let mut custom_headers = base_headers.clone();
552        if let Some(target_headers) = &target.headers {
553            custom_headers.extend(target_headers.clone());
554        }
555
556        // Use target-specific auth if provided, otherwise use base auth
557        let auth_header = target.auth.as_ref().or(auth.as_ref()).cloned();
558
559        // Create k6 config for this target
560        let k6_config = K6Config {
561            target_url: target.url.clone(),
562            base_path: base_path.cloned(),
563            scenario: scenario.clone(),
564            duration_secs,
565            max_vus: vus,
566            threshold_percentile: threshold_percentile.to_string(),
567            threshold_ms,
568            max_error_rate,
569            auth_header,
570            custom_headers,
571            skip_tls_verify,
572            security_testing_enabled,
573            chunked_request_bodies,
574            target_rps,
575            no_keep_alive,
576            geo_source_ips: Vec::new(),
577            geo_source_headers: Vec::new(),
578        };
579
580        // Generate k6 script
581        let generator = K6ScriptGenerator::new(k6_config, templates.to_vec());
582        let mut script = generator.generate()?;
583
584        // Apply pre-computed enhancement code (security definitions, etc.)
585        if !enhancement_code.is_empty() {
586            if let Some(pos) = script.find("export const options") {
587                script.insert_str(pos, enhancement_code);
588            }
589        }
590
591        // Validate script
592        let validation_errors = K6ScriptGenerator::validate_script(&script);
593        if !validation_errors.is_empty() {
594            return Err(BenchError::Other(format!(
595                "Script validation failed for target {}: {}",
596                target.url,
597                validation_errors.join(", ")
598            )));
599        }
600
601        // Create output directory for this target
602        let output_dir = base_output.join(format!("target_{}", target_index + 1));
603        std::fs::create_dir_all(&output_dir)?;
604
605        // Write script to file
606        let script_path = output_dir.join("k6-script.js");
607        std::fs::write(&script_path, script)?;
608
609        // Execute k6 with a unique API server port per target to avoid port conflicts.
610        // k6 defaults to localhost:6565 for its REST API; parallel instances collide.
611        let api_port = 6565 + (target_index as u16) + 1; // 6566, 6567, ...
612        let executor = K6Executor::new()?;
613        let results = executor
614            .execute_with_port(&script_path, Some(&output_dir), verbose, Some(api_port))
615            .await;
616
617        match results {
618            Ok(k6_results) => Ok(TargetResult {
619                target_url: target.url.clone(),
620                target_index,
621                results: k6_results,
622                output_dir,
623                success: true,
624                error: None,
625            }),
626            Err(e) => Ok(TargetResult {
627                target_url: target.url.clone(),
628                target_index,
629                results: K6Results::default(),
630                output_dir,
631                success: false,
632                error: Some(e.to_string()),
633            }),
634        }
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641
642    #[test]
643    fn test_aggregated_metrics_from_results() {
644        let results = vec![
645            TargetResult {
646                target_url: "http://api1.com".to_string(),
647                target_index: 0,
648                results: K6Results {
649                    total_requests: 100,
650                    failed_requests: 5,
651                    avg_duration_ms: 100.0,
652                    p95_duration_ms: 200.0,
653                    p99_duration_ms: 300.0,
654                    ..Default::default()
655                },
656                output_dir: PathBuf::from("output1"),
657                success: true,
658                error: None,
659            },
660            TargetResult {
661                target_url: "http://api2.com".to_string(),
662                target_index: 1,
663                results: K6Results {
664                    total_requests: 200,
665                    failed_requests: 10,
666                    avg_duration_ms: 150.0,
667                    p95_duration_ms: 250.0,
668                    p99_duration_ms: 350.0,
669                    ..Default::default()
670                },
671                output_dir: PathBuf::from("output2"),
672                success: true,
673                error: None,
674            },
675        ];
676
677        let metrics = AggregatedMetrics::from_results(&results);
678        assert_eq!(metrics.total_requests, 300);
679        assert_eq!(metrics.total_failed_requests, 15);
680        assert_eq!(metrics.avg_duration_ms, 125.0); // (100 + 150) / 2
681    }
682
683    #[test]
684    fn test_aggregated_metrics_with_failed_targets() {
685        let results = vec![
686            TargetResult {
687                target_url: "http://api1.com".to_string(),
688                target_index: 0,
689                results: K6Results {
690                    total_requests: 100,
691                    failed_requests: 5,
692                    avg_duration_ms: 100.0,
693                    p95_duration_ms: 200.0,
694                    p99_duration_ms: 300.0,
695                    ..Default::default()
696                },
697                output_dir: PathBuf::from("output1"),
698                success: true,
699                error: None,
700            },
701            TargetResult {
702                target_url: "http://api2.com".to_string(),
703                target_index: 1,
704                results: K6Results::default(),
705                output_dir: PathBuf::from("output2"),
706                success: false,
707                error: Some("Network error".to_string()),
708            },
709        ];
710
711        let metrics = AggregatedMetrics::from_results(&results);
712        // Only successful target should be counted
713        assert_eq!(metrics.total_requests, 100);
714        assert_eq!(metrics.total_failed_requests, 5);
715        assert_eq!(metrics.avg_duration_ms, 100.0);
716    }
717
718    #[test]
719    fn test_aggregated_metrics_empty_results() {
720        let results = vec![];
721        let metrics = AggregatedMetrics::from_results(&results);
722        assert_eq!(metrics.total_requests, 0);
723        assert_eq!(metrics.total_failed_requests, 0);
724        assert_eq!(metrics.avg_duration_ms, 0.0);
725        assert_eq!(metrics.error_rate, 0.0);
726    }
727
728    #[test]
729    fn test_aggregated_metrics_error_rate_calculation() {
730        let results = vec![TargetResult {
731            target_url: "http://api1.com".to_string(),
732            target_index: 0,
733            results: K6Results {
734                total_requests: 1000,
735                failed_requests: 50,
736                avg_duration_ms: 100.0,
737                p95_duration_ms: 200.0,
738                p99_duration_ms: 300.0,
739                ..Default::default()
740            },
741            output_dir: PathBuf::from("output1"),
742            success: true,
743            error: None,
744        }];
745
746        let metrics = AggregatedMetrics::from_results(&results);
747        assert_eq!(metrics.error_rate, 5.0); // 50/1000 * 100
748    }
749
750    #[test]
751    fn test_aggregated_metrics_p95_p99_calculation() {
752        let results = vec![
753            TargetResult {
754                target_url: "http://api1.com".to_string(),
755                target_index: 0,
756                results: K6Results {
757                    total_requests: 100,
758                    failed_requests: 0,
759                    avg_duration_ms: 100.0,
760                    p95_duration_ms: 150.0,
761                    p99_duration_ms: 200.0,
762                    ..Default::default()
763                },
764                output_dir: PathBuf::from("output1"),
765                success: true,
766                error: None,
767            },
768            TargetResult {
769                target_url: "http://api2.com".to_string(),
770                target_index: 1,
771                results: K6Results {
772                    total_requests: 100,
773                    failed_requests: 0,
774                    avg_duration_ms: 200.0,
775                    p95_duration_ms: 250.0,
776                    p99_duration_ms: 300.0,
777                    ..Default::default()
778                },
779                output_dir: PathBuf::from("output2"),
780                success: true,
781                error: None,
782            },
783            TargetResult {
784                target_url: "http://api3.com".to_string(),
785                target_index: 2,
786                results: K6Results {
787                    total_requests: 100,
788                    failed_requests: 0,
789                    avg_duration_ms: 300.0,
790                    p95_duration_ms: 350.0,
791                    p99_duration_ms: 400.0,
792                    ..Default::default()
793                },
794                output_dir: PathBuf::from("output3"),
795                success: true,
796                error: None,
797            },
798        ];
799
800        let metrics = AggregatedMetrics::from_results(&results);
801        // p95 should be the 95th percentile of [150, 250, 350] = index 2 = 350
802        // p99 should be the 99th percentile of [200, 300, 400] = index 2 = 400
803        assert_eq!(metrics.p95_duration_ms, 350.0);
804        assert_eq!(metrics.p99_duration_ms, 400.0);
805    }
806}