Skip to main content

fluxbench_cli/executor/
execution.rs

1//! Benchmark Execution
2//!
3//! Core execution logic for running benchmarks, including both in-process
4//! and isolated (IPC-based) execution modes.
5//!
6//! ## Execution Modes
7//!
8//! - **In-process (`Executor`)**: Runs benchmarks in the same process. Fast but
9//!   a panic in one benchmark will crash the entire run. Best for development.
10//!
11//! - **Isolated (`IsolatedExecutor`)**: Spawns worker processes via IPC. Provides
12//!   crash isolation - a panic in one benchmark won't affect others. Recommended
13//!   for production runs.
14//!
15//! ## Data Flow
16//!
17//! ```text
18//! BenchmarkDef (from inventory)
19//!        │
20//!        ▼
21//!   ExecutionConfig
22//!        │
23//!        ▼
24//! ┌──────────────────┐
25//! │  Executor/       │  Warmup → Measurement → Sample Collection
26//! │  IsolatedExecutor│
27//! └────────┬─────────┘
28//!          │
29//!          ▼
30//!  BenchExecutionResult (samples, status, allocations, cycles)
31//! ```
32
33use crate::supervisor::{IpcBenchmarkResult, IpcBenchmarkStatus, Supervisor};
34use fluxbench_core::{Bencher, BenchmarkDef, run_benchmark_loop};
35use fluxbench_ipc::BenchmarkConfig;
36use fluxbench_report::BenchmarkStatus;
37use indicatif::{ProgressBar, ProgressStyle};
38use std::time::{Duration, Instant};
39
40/// Configuration for benchmark execution
41#[derive(Debug, Clone)]
42pub struct ExecutionConfig {
43    /// Warmup time in nanoseconds
44    pub warmup_time_ns: u64,
45    /// Measurement time in nanoseconds
46    pub measurement_time_ns: u64,
47    /// Minimum iterations
48    pub min_iterations: Option<u64>,
49    /// Maximum iterations
50    pub max_iterations: Option<u64>,
51    /// Track allocations
52    pub track_allocations: bool,
53    /// Number of bootstrap iterations for statistics
54    pub bootstrap_iterations: usize,
55    /// Confidence level for intervals
56    pub confidence_level: f64,
57}
58
59impl ExecutionConfig {
60    /// Merge per-benchmark configuration overrides with global defaults.
61    ///
62    /// Priority:
63    /// 1. Per-benchmark `samples` (if set): overrides everything, runs fixed N iterations with no warmup
64    /// 2. Per-benchmark `warmup_ns`/`measurement_ns`: override global values
65    /// 3. Per-benchmark `min/max_iterations`: override global values
66    /// 4. Falls back to global config for anything not overridden
67    pub fn resolve_for_benchmark(&self, bench: &BenchmarkDef) -> ExecutionConfig {
68        // Fixed sample count mode: per-bench samples override everything
69        if let Some(n) = bench.samples {
70            return ExecutionConfig {
71                warmup_time_ns: 0,
72                measurement_time_ns: 0,
73                min_iterations: Some(n),
74                max_iterations: Some(n),
75                ..self.clone()
76            };
77        }
78
79        ExecutionConfig {
80            warmup_time_ns: bench.warmup_ns.unwrap_or(self.warmup_time_ns),
81            measurement_time_ns: bench.measurement_ns.unwrap_or(self.measurement_time_ns),
82            min_iterations: bench.min_iterations.or(self.min_iterations),
83            max_iterations: bench.max_iterations.or(self.max_iterations),
84            ..self.clone()
85        }
86    }
87}
88
89impl Default for ExecutionConfig {
90    fn default() -> Self {
91        Self {
92            warmup_time_ns: 3_000_000_000,      // 3 seconds
93            measurement_time_ns: 5_000_000_000, // 5 seconds
94            min_iterations: Some(100),
95            max_iterations: None,
96            track_allocations: true,
97            bootstrap_iterations: 100_000, // Matches Criterion default
98            confidence_level: 0.95,
99        }
100    }
101}
102
103/// Result from executing a single benchmark
104#[derive(Debug)]
105pub struct BenchExecutionResult {
106    pub benchmark_id: String,
107    pub benchmark_name: String,
108    pub group: String,
109    pub file: String,
110    pub line: u32,
111    pub status: BenchmarkStatus,
112    pub samples: Vec<f64>,
113    /// CPU cycles per sample (parallel with samples)
114    pub cpu_cycles: Vec<u64>,
115    pub alloc_bytes: u64,
116    pub alloc_count: u64,
117    pub duration_ns: u64,
118    pub error_message: Option<String>,
119    pub failure_kind: Option<String>,
120    pub backtrace: Option<String>,
121    pub severity: fluxbench_core::Severity,
122}
123
124/// Execute benchmarks and produce results (in-process mode)
125pub struct Executor {
126    config: ExecutionConfig,
127    results: Vec<BenchExecutionResult>,
128}
129
130impl Executor {
131    pub fn new(config: ExecutionConfig) -> Self {
132        Self {
133            config,
134            results: Vec::new(),
135        }
136    }
137
138    /// Execute all provided benchmarks
139    pub fn execute(&mut self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
140        let pb = ProgressBar::new(benchmarks.len() as u64);
141        pb.set_style(
142            ProgressStyle::default_bar()
143                .template(
144                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
145                )
146                .unwrap_or_else(|_| ProgressStyle::default_bar())
147                .progress_chars("#>-"),
148        );
149
150        for bench in benchmarks {
151            pb.set_message(bench.id.to_string());
152            let result = self.execute_single(bench);
153            self.results.push(result);
154            pb.inc(1);
155        }
156
157        pb.finish_with_message("Complete");
158        std::mem::take(&mut self.results)
159    }
160
161    /// Execute a single benchmark
162    fn execute_single(&self, bench: &BenchmarkDef) -> BenchExecutionResult {
163        let start = Instant::now();
164        let cfg = self.config.resolve_for_benchmark(bench);
165
166        // Run with panic catching
167        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
168            let bencher = Bencher::new(cfg.track_allocations);
169
170            run_benchmark_loop(
171                bencher,
172                |b| (bench.runner_fn)(b),
173                cfg.warmup_time_ns,
174                cfg.measurement_time_ns,
175                cfg.min_iterations,
176                cfg.max_iterations,
177            )
178        }));
179
180        let duration_ns = start.elapsed().as_nanos() as u64;
181
182        match result {
183            Ok(bench_result) => {
184                // Extract timing samples as f64 for statistics
185                let samples: Vec<f64> = bench_result
186                    .samples
187                    .iter()
188                    .map(|s| s.duration_nanos as f64)
189                    .collect();
190
191                // Extract CPU cycles (parallel array with samples)
192                let cpu_cycles: Vec<u64> =
193                    bench_result.samples.iter().map(|s| s.cpu_cycles).collect();
194
195                // Sum allocations
196                let alloc_bytes: u64 = bench_result.samples.iter().map(|s| s.alloc_bytes).sum();
197                let alloc_count: u64 = bench_result
198                    .samples
199                    .iter()
200                    .map(|s| s.alloc_count as u64)
201                    .sum();
202
203                BenchExecutionResult {
204                    benchmark_id: bench.id.to_string(),
205                    benchmark_name: bench.name.to_string(),
206                    group: bench.group.to_string(),
207                    file: bench.file.to_string(),
208                    line: bench.line,
209                    status: BenchmarkStatus::Passed,
210                    samples,
211                    cpu_cycles,
212                    alloc_bytes,
213                    alloc_count,
214                    duration_ns,
215                    error_message: None,
216                    failure_kind: None,
217                    backtrace: None,
218                    severity: bench.severity,
219                }
220            }
221            Err(panic) => {
222                let message = if let Some(s) = panic.downcast_ref::<&str>() {
223                    s.to_string()
224                } else if let Some(s) = panic.downcast_ref::<String>() {
225                    s.clone()
226                } else {
227                    "Unknown panic".to_string()
228                };
229
230                BenchExecutionResult {
231                    benchmark_id: bench.id.to_string(),
232                    benchmark_name: bench.name.to_string(),
233                    group: bench.group.to_string(),
234                    file: bench.file.to_string(),
235                    line: bench.line,
236                    status: BenchmarkStatus::Crashed,
237                    samples: Vec::new(),
238                    cpu_cycles: Vec::new(),
239                    alloc_bytes: 0,
240                    alloc_count: 0,
241                    duration_ns,
242                    error_message: Some(message),
243                    failure_kind: Some("panic".to_string()),
244                    backtrace: None,
245                    severity: bench.severity,
246                }
247            }
248        }
249    }
250}
251
252/// Executor that runs benchmarks in isolated worker processes via IPC
253///
254/// This provides crash isolation - if a benchmark panics or crashes,
255/// it won't take down the supervisor process.
256pub struct IsolatedExecutor {
257    config: ExecutionConfig,
258    timeout: Duration,
259    reuse_workers: bool,
260    num_workers: usize,
261}
262
263impl IsolatedExecutor {
264    /// Create a new isolated executor
265    pub fn new(
266        config: ExecutionConfig,
267        timeout: Duration,
268        reuse_workers: bool,
269        num_workers: usize,
270    ) -> Self {
271        Self {
272            config,
273            timeout,
274            reuse_workers,
275            num_workers: num_workers.max(1),
276        }
277    }
278
279    /// Execute all provided benchmarks in isolated worker processes
280    pub fn execute(&self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
281        let pb = ProgressBar::new(benchmarks.len() as u64);
282        pb.set_style(
283            ProgressStyle::default_bar()
284                .template(
285                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
286                )
287                .unwrap_or_else(|_| ProgressStyle::default_bar())
288                .progress_chars("#>-"),
289        );
290        pb.set_message("Starting isolated workers...");
291
292        // Build per-benchmark IPC configs
293        let ipc_configs: Vec<BenchmarkConfig> = benchmarks
294            .iter()
295            .map(|bench| {
296                let cfg = self.config.resolve_for_benchmark(bench);
297                BenchmarkConfig {
298                    warmup_time_ns: cfg.warmup_time_ns,
299                    measurement_time_ns: cfg.measurement_time_ns,
300                    min_iterations: cfg.min_iterations,
301                    max_iterations: cfg.max_iterations,
302                    track_allocations: cfg.track_allocations,
303                    fail_on_allocation: false,
304                    timeout_ns: self.timeout.as_nanos() as u64,
305                }
306            })
307            .collect();
308
309        // Use the first config as default for the supervisor
310        let default_config = ipc_configs.first().cloned().unwrap_or(BenchmarkConfig {
311            warmup_time_ns: self.config.warmup_time_ns,
312            measurement_time_ns: self.config.measurement_time_ns,
313            min_iterations: self.config.min_iterations,
314            max_iterations: self.config.max_iterations,
315            track_allocations: self.config.track_allocations,
316            fail_on_allocation: false,
317            timeout_ns: self.timeout.as_nanos() as u64,
318        });
319
320        let supervisor = Supervisor::new(default_config, self.timeout, self.num_workers);
321
322        // Run benchmarks via IPC with per-benchmark configs
323        let ipc_results = if self.reuse_workers {
324            supervisor.run_with_reuse_configs(benchmarks, &ipc_configs)
325        } else {
326            supervisor.run_all_configs(benchmarks, &ipc_configs)
327        };
328
329        // Convert IPC results to BenchExecutionResult
330        let mut results = Vec::with_capacity(benchmarks.len());
331
332        match ipc_results {
333            Ok(ipc_results) => {
334                for (ipc_result, bench) in ipc_results.into_iter().zip(benchmarks.iter()) {
335                    pb.set_message(bench.id.to_string());
336                    results.push(self.convert_ipc_result(ipc_result, bench));
337                    pb.inc(1);
338                }
339            }
340            Err(e) => {
341                // Supervisor-level failure - mark all as crashed
342                for bench in benchmarks {
343                    results.push(BenchExecutionResult {
344                        benchmark_id: bench.id.to_string(),
345                        benchmark_name: bench.name.to_string(),
346                        group: bench.group.to_string(),
347                        file: bench.file.to_string(),
348                        line: bench.line,
349                        status: BenchmarkStatus::Crashed,
350                        samples: Vec::new(),
351                        cpu_cycles: Vec::new(),
352                        alloc_bytes: 0,
353                        alloc_count: 0,
354                        duration_ns: 0,
355                        error_message: Some(format!("Supervisor error: {}", e)),
356                        failure_kind: Some("crashed".to_string()),
357                        backtrace: None,
358                        severity: bench.severity,
359                    });
360                    pb.inc(1);
361                }
362            }
363        }
364
365        pb.finish_with_message("Complete (isolated)");
366        results
367    }
368
369    /// Convert an IPC result to a BenchExecutionResult
370    fn convert_ipc_result(
371        &self,
372        ipc_result: IpcBenchmarkResult,
373        bench: &BenchmarkDef,
374    ) -> BenchExecutionResult {
375        let (status, error_message, failure_kind, backtrace) = match ipc_result.status {
376            IpcBenchmarkStatus::Success => (BenchmarkStatus::Passed, None, None, None),
377            IpcBenchmarkStatus::Failed {
378                message,
379                kind,
380                backtrace,
381            } => (
382                BenchmarkStatus::Failed,
383                Some(message),
384                Some(kind),
385                backtrace,
386            ),
387            IpcBenchmarkStatus::Crashed {
388                message,
389                kind,
390                backtrace,
391            } => (
392                BenchmarkStatus::Crashed,
393                Some(message),
394                Some(kind),
395                backtrace,
396            ),
397        };
398
399        // Extract timing samples as f64 for statistics
400        let samples: Vec<f64> = ipc_result
401            .samples
402            .iter()
403            .map(|s| s.duration_nanos as f64)
404            .collect();
405
406        // Extract CPU cycles
407        let cpu_cycles: Vec<u64> = ipc_result.samples.iter().map(|s| s.cpu_cycles).collect();
408
409        // Sum allocations
410        let alloc_bytes: u64 = ipc_result.samples.iter().map(|s| s.alloc_bytes).sum();
411        let alloc_count: u64 = ipc_result
412            .samples
413            .iter()
414            .map(|s| s.alloc_count as u64)
415            .sum();
416
417        BenchExecutionResult {
418            benchmark_id: bench.id.to_string(),
419            benchmark_name: bench.name.to_string(),
420            group: bench.group.to_string(),
421            file: bench.file.to_string(),
422            line: bench.line,
423            status,
424            samples,
425            cpu_cycles,
426            alloc_bytes,
427            alloc_count,
428            duration_ns: ipc_result.total_duration_nanos,
429            error_message,
430            failure_kind,
431            backtrace,
432            severity: bench.severity,
433        }
434    }
435}