Skip to main content

fluxbench_cli/executor/
execution.rs

1//! Benchmark Execution
2//!
3//! Core execution logic for running benchmarks, including both in-process
4//! and isolated (IPC-based) execution modes.
5//!
6//! ## Execution Modes
7//!
8//! - **In-process (`Executor`)**: Runs benchmarks in the same process. Fast but
9//!   a panic in one benchmark will crash the entire run. Best for development.
10//!
11//! - **Isolated (`IsolatedExecutor`)**: Spawns worker processes via IPC. Provides
12//!   crash isolation - a panic in one benchmark won't affect others. Recommended
13//!   for production runs.
14//!
15//! ## Data Flow
16//!
17//! ```text
18//! BenchmarkDef (from inventory)
19//!        │
20//!        ▼
21//!   ExecutionConfig
22//!        │
23//!        ▼
24//! ┌──────────────────┐
25//! │  Executor/       │  Warmup → Measurement → Sample Collection
26//! │  IsolatedExecutor│
27//! └────────┬─────────┘
28//!          │
29//!          ▼
30//!  BenchExecutionResult (samples, status, allocations, cycles)
31//! ```
32
33use crate::supervisor::{IpcBenchmarkResult, IpcBenchmarkStatus, Supervisor};
34use fluxbench_core::{Bencher, BenchmarkDef, run_benchmark_loop};
35use fluxbench_ipc::BenchmarkConfig;
36use fluxbench_report::BenchmarkStatus;
37use indicatif::{ProgressBar, ProgressStyle};
38use std::time::{Duration, Instant};
39
40/// Configuration for benchmark execution
41#[derive(Debug, Clone)]
42pub struct ExecutionConfig {
43    /// Warmup time in nanoseconds
44    pub warmup_time_ns: u64,
45    /// Measurement time in nanoseconds
46    pub measurement_time_ns: u64,
47    /// Minimum iterations
48    pub min_iterations: Option<u64>,
49    /// Maximum iterations
50    pub max_iterations: Option<u64>,
51    /// Track allocations
52    pub track_allocations: bool,
53    /// Number of bootstrap iterations for statistics
54    pub bootstrap_iterations: usize,
55    /// Confidence level for intervals
56    pub confidence_level: f64,
57}
58
59impl ExecutionConfig {
60    /// Merge per-benchmark configuration overrides with global defaults.
61    ///
62    /// Priority:
63    /// 1. Per-benchmark `samples` (if set): overrides everything, runs fixed N iterations with no warmup
64    /// 2. Per-benchmark `warmup_ns`/`measurement_ns`: override global values
65    /// 3. Per-benchmark `min/max_iterations`: override global values
66    /// 4. Falls back to global config for anything not overridden
67    pub fn resolve_for_benchmark(&self, bench: &BenchmarkDef) -> ExecutionConfig {
68        // Fixed sample count mode: per-bench samples override everything
69        if let Some(n) = bench.samples {
70            return ExecutionConfig {
71                warmup_time_ns: 0,
72                measurement_time_ns: 0,
73                min_iterations: Some(n),
74                max_iterations: Some(n),
75                ..self.clone()
76            };
77        }
78
79        ExecutionConfig {
80            warmup_time_ns: bench.warmup_ns.unwrap_or(self.warmup_time_ns),
81            measurement_time_ns: bench.measurement_ns.unwrap_or(self.measurement_time_ns),
82            min_iterations: bench.min_iterations.or(self.min_iterations),
83            max_iterations: bench.max_iterations.or(self.max_iterations),
84            ..self.clone()
85        }
86    }
87}
88
89impl Default for ExecutionConfig {
90    fn default() -> Self {
91        Self {
92            warmup_time_ns: 3_000_000_000,      // 3 seconds
93            measurement_time_ns: 5_000_000_000, // 5 seconds
94            min_iterations: Some(100),
95            max_iterations: None,
96            track_allocations: true,
97            bootstrap_iterations: 100_000, // Matches Criterion default
98            confidence_level: 0.95,
99        }
100    }
101}
102
103/// Result from executing a single benchmark
104#[derive(Debug)]
105pub struct BenchExecutionResult {
106    pub benchmark_id: String,
107    pub benchmark_name: String,
108    pub group: String,
109    pub file: String,
110    pub line: u32,
111    pub status: BenchmarkStatus,
112    pub samples: Vec<f64>,
113    /// CPU cycles per sample (parallel with samples)
114    pub cpu_cycles: Vec<u64>,
115    pub alloc_bytes: u64,
116    pub alloc_count: u64,
117    pub duration_ns: u64,
118    pub error_message: Option<String>,
119    pub failure_kind: Option<String>,
120    pub backtrace: Option<String>,
121    pub severity: fluxbench_core::Severity,
122}
123
124/// Execute benchmarks and produce results (in-process mode)
125pub struct Executor {
126    config: ExecutionConfig,
127    results: Vec<BenchExecutionResult>,
128}
129
130impl Executor {
131    /// Create a new in-process executor with the given configuration
132    pub fn new(config: ExecutionConfig) -> Self {
133        Self {
134            config,
135            results: Vec::new(),
136        }
137    }
138
139    /// Execute all provided benchmarks
140    pub fn execute(&mut self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
141        let pb = ProgressBar::new(benchmarks.len() as u64);
142        pb.set_style(
143            ProgressStyle::default_bar()
144                .template(
145                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
146                )
147                .unwrap_or_else(|_| ProgressStyle::default_bar())
148                .progress_chars("#>-"),
149        );
150
151        for bench in benchmarks {
152            pb.set_message(bench.id.to_string());
153            let result = self.execute_single(bench);
154            self.results.push(result);
155            pb.inc(1);
156        }
157
158        pb.finish_with_message("Complete");
159        std::mem::take(&mut self.results)
160    }
161
162    /// Execute a single benchmark
163    fn execute_single(&self, bench: &BenchmarkDef) -> BenchExecutionResult {
164        let start = Instant::now();
165        let cfg = self.config.resolve_for_benchmark(bench);
166
167        // Run with panic catching
168        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
169            let bencher = Bencher::new(cfg.track_allocations);
170
171            run_benchmark_loop(
172                bencher,
173                |b| (bench.runner_fn)(b),
174                cfg.warmup_time_ns,
175                cfg.measurement_time_ns,
176                cfg.min_iterations,
177                cfg.max_iterations,
178            )
179        }));
180
181        let duration_ns = start.elapsed().as_nanos() as u64;
182
183        match result {
184            Ok(bench_result) => {
185                // Extract timing samples as f64 for statistics
186                let samples: Vec<f64> = bench_result
187                    .samples
188                    .iter()
189                    .map(|s| s.duration_nanos as f64)
190                    .collect();
191
192                // Extract CPU cycles (parallel array with samples)
193                let cpu_cycles: Vec<u64> =
194                    bench_result.samples.iter().map(|s| s.cpu_cycles).collect();
195
196                // Sum allocations
197                let alloc_bytes: u64 = bench_result.samples.iter().map(|s| s.alloc_bytes).sum();
198                let alloc_count: u64 = bench_result
199                    .samples
200                    .iter()
201                    .map(|s| s.alloc_count as u64)
202                    .sum();
203
204                BenchExecutionResult {
205                    benchmark_id: bench.id.to_string(),
206                    benchmark_name: bench.name.to_string(),
207                    group: bench.group.to_string(),
208                    file: bench.file.to_string(),
209                    line: bench.line,
210                    status: BenchmarkStatus::Passed,
211                    samples,
212                    cpu_cycles,
213                    alloc_bytes,
214                    alloc_count,
215                    duration_ns,
216                    error_message: None,
217                    failure_kind: None,
218                    backtrace: None,
219                    severity: bench.severity,
220                }
221            }
222            Err(panic) => {
223                let message = if let Some(s) = panic.downcast_ref::<&str>() {
224                    s.to_string()
225                } else if let Some(s) = panic.downcast_ref::<String>() {
226                    s.clone()
227                } else {
228                    "Unknown panic".to_string()
229                };
230
231                BenchExecutionResult {
232                    benchmark_id: bench.id.to_string(),
233                    benchmark_name: bench.name.to_string(),
234                    group: bench.group.to_string(),
235                    file: bench.file.to_string(),
236                    line: bench.line,
237                    status: BenchmarkStatus::Crashed,
238                    samples: Vec::new(),
239                    cpu_cycles: Vec::new(),
240                    alloc_bytes: 0,
241                    alloc_count: 0,
242                    duration_ns,
243                    error_message: Some(message),
244                    failure_kind: Some("panic".to_string()),
245                    backtrace: None,
246                    severity: bench.severity,
247                }
248            }
249        }
250    }
251}
252
253/// Executor that runs benchmarks in isolated worker processes via IPC
254///
255/// This provides crash isolation - if a benchmark panics or crashes,
256/// it won't take down the supervisor process.
257pub struct IsolatedExecutor {
258    config: ExecutionConfig,
259    timeout: Duration,
260    reuse_workers: bool,
261    num_workers: usize,
262}
263
264impl IsolatedExecutor {
265    /// Create a new isolated executor
266    pub fn new(
267        config: ExecutionConfig,
268        timeout: Duration,
269        reuse_workers: bool,
270        num_workers: usize,
271    ) -> Self {
272        Self {
273            config,
274            timeout,
275            reuse_workers,
276            num_workers: num_workers.max(1),
277        }
278    }
279
280    /// Execute all provided benchmarks in isolated worker processes
281    pub fn execute(&self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
282        let pb = ProgressBar::new(benchmarks.len() as u64);
283        pb.set_style(
284            ProgressStyle::default_bar()
285                .template(
286                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
287                )
288                .unwrap_or_else(|_| ProgressStyle::default_bar())
289                .progress_chars("#>-"),
290        );
291        pb.set_message("Starting isolated workers...");
292
293        // Build per-benchmark IPC configs
294        let ipc_configs: Vec<BenchmarkConfig> = benchmarks
295            .iter()
296            .map(|bench| {
297                let cfg = self.config.resolve_for_benchmark(bench);
298                BenchmarkConfig {
299                    warmup_time_ns: cfg.warmup_time_ns,
300                    measurement_time_ns: cfg.measurement_time_ns,
301                    min_iterations: cfg.min_iterations,
302                    max_iterations: cfg.max_iterations,
303                    track_allocations: cfg.track_allocations,
304                    fail_on_allocation: false,
305                    timeout_ns: self.timeout.as_nanos() as u64,
306                }
307            })
308            .collect();
309
310        // Use the first config as default for the supervisor
311        let default_config = ipc_configs.first().cloned().unwrap_or(BenchmarkConfig {
312            warmup_time_ns: self.config.warmup_time_ns,
313            measurement_time_ns: self.config.measurement_time_ns,
314            min_iterations: self.config.min_iterations,
315            max_iterations: self.config.max_iterations,
316            track_allocations: self.config.track_allocations,
317            fail_on_allocation: false,
318            timeout_ns: self.timeout.as_nanos() as u64,
319        });
320
321        let supervisor = Supervisor::new(default_config, self.timeout, self.num_workers);
322
323        // Run benchmarks via IPC with per-benchmark configs
324        let ipc_results = if self.reuse_workers {
325            supervisor.run_with_reuse_configs(benchmarks, &ipc_configs)
326        } else {
327            supervisor.run_all_configs(benchmarks, &ipc_configs)
328        };
329
330        // Convert IPC results to BenchExecutionResult
331        let mut results = Vec::with_capacity(benchmarks.len());
332
333        match ipc_results {
334            Ok(ipc_results) => {
335                for (ipc_result, bench) in ipc_results.into_iter().zip(benchmarks.iter()) {
336                    pb.set_message(bench.id.to_string());
337                    results.push(self.convert_ipc_result(ipc_result, bench));
338                    pb.inc(1);
339                }
340            }
341            Err(e) => {
342                // Supervisor-level failure - mark all as crashed
343                for bench in benchmarks {
344                    results.push(BenchExecutionResult {
345                        benchmark_id: bench.id.to_string(),
346                        benchmark_name: bench.name.to_string(),
347                        group: bench.group.to_string(),
348                        file: bench.file.to_string(),
349                        line: bench.line,
350                        status: BenchmarkStatus::Crashed,
351                        samples: Vec::new(),
352                        cpu_cycles: Vec::new(),
353                        alloc_bytes: 0,
354                        alloc_count: 0,
355                        duration_ns: 0,
356                        error_message: Some(format!("Supervisor error: {}", e)),
357                        failure_kind: Some("crashed".to_string()),
358                        backtrace: None,
359                        severity: bench.severity,
360                    });
361                    pb.inc(1);
362                }
363            }
364        }
365
366        pb.finish_with_message("Complete (isolated)");
367        results
368    }
369
370    /// Convert an IPC result to a BenchExecutionResult
371    fn convert_ipc_result(
372        &self,
373        ipc_result: IpcBenchmarkResult,
374        bench: &BenchmarkDef,
375    ) -> BenchExecutionResult {
376        let (status, error_message, failure_kind, backtrace) = match ipc_result.status {
377            IpcBenchmarkStatus::Success => (BenchmarkStatus::Passed, None, None, None),
378            IpcBenchmarkStatus::Failed {
379                message,
380                kind,
381                backtrace,
382            } => (
383                BenchmarkStatus::Failed,
384                Some(message),
385                Some(kind),
386                backtrace,
387            ),
388            IpcBenchmarkStatus::Crashed {
389                message,
390                kind,
391                backtrace,
392            } => (
393                BenchmarkStatus::Crashed,
394                Some(message),
395                Some(kind),
396                backtrace,
397            ),
398        };
399
400        // Extract timing samples as f64 for statistics
401        let samples: Vec<f64> = ipc_result
402            .samples
403            .iter()
404            .map(|s| s.duration_nanos as f64)
405            .collect();
406
407        // Extract CPU cycles
408        let cpu_cycles: Vec<u64> = ipc_result.samples.iter().map(|s| s.cpu_cycles).collect();
409
410        // Sum allocations
411        let alloc_bytes: u64 = ipc_result.samples.iter().map(|s| s.alloc_bytes).sum();
412        let alloc_count: u64 = ipc_result
413            .samples
414            .iter()
415            .map(|s| s.alloc_count as u64)
416            .sum();
417
418        BenchExecutionResult {
419            benchmark_id: bench.id.to_string(),
420            benchmark_name: bench.name.to_string(),
421            group: bench.group.to_string(),
422            file: bench.file.to_string(),
423            line: bench.line,
424            status,
425            samples,
426            cpu_cycles,
427            alloc_bytes,
428            alloc_count,
429            duration_ns: ipc_result.total_duration_nanos,
430            error_message,
431            failure_kind,
432            backtrace,
433            severity: bench.severity,
434        }
435    }
436}