Skip to main content

fluxbench_cli/executor/
execution.rs

1//! Benchmark Execution
2//!
3//! Core execution logic for running benchmarks, including both in-process
4//! and isolated (IPC-based) execution modes.
5//!
6//! ## Execution Modes
7//!
8//! - **In-process (`Executor`)**: Runs benchmarks in the same process. Fast but
9//!   a panic in one benchmark will crash the entire run. Best for development.
10//!
11//! - **Isolated (`IsolatedExecutor`)**: Spawns worker processes via IPC. Provides
12//!   crash isolation - a panic in one benchmark won't affect others. Recommended
13//!   for production runs.
14//!
15//! ## Data Flow
16//!
17//! ```text
18//! BenchmarkDef (from inventory)
19//!        │
20//!        ▼
21//!   ExecutionConfig
22//!        │
23//!        ▼
24//! ┌──────────────────┐
25//! │  Executor/       │  Warmup → Measurement → Sample Collection
26//! │  IsolatedExecutor│
27//! └────────┬─────────┘
28//!          │
29//!          ▼
30//!  BenchExecutionResult (samples, status, allocations, cycles)
31//! ```
32
33use crate::supervisor::{IpcBenchmarkResult, IpcBenchmarkStatus, Supervisor};
34use fluxbench_core::{Bencher, BenchmarkDef, run_benchmark_loop};
35use fluxbench_ipc::BenchmarkConfig;
36use fluxbench_report::BenchmarkStatus;
37use indicatif::{ProgressBar, ProgressStyle};
38use std::time::{Duration, Instant};
39
40/// Configuration for benchmark execution
41#[derive(Debug, Clone)]
42pub struct ExecutionConfig {
43    /// Warmup time in nanoseconds
44    pub warmup_time_ns: u64,
45    /// Measurement time in nanoseconds
46    pub measurement_time_ns: u64,
47    /// Minimum iterations
48    pub min_iterations: Option<u64>,
49    /// Maximum iterations
50    pub max_iterations: Option<u64>,
51    /// Track allocations
52    pub track_allocations: bool,
53    /// Number of bootstrap iterations for statistics
54    pub bootstrap_iterations: usize,
55    /// Confidence level for intervals
56    pub confidence_level: f64,
57}
58
59impl ExecutionConfig {
60    /// Merge per-benchmark configuration overrides with global defaults.
61    ///
62    /// Priority:
63    /// 1. Per-benchmark `samples` (if set): overrides everything, runs fixed N iterations with no warmup
64    /// 2. Per-benchmark `warmup_ns`/`measurement_ns`: override global values
65    /// 3. Per-benchmark `min/max_iterations`: override global values
66    /// 4. Falls back to global config for anything not overridden
67    pub fn resolve_for_benchmark(&self, bench: &BenchmarkDef) -> ExecutionConfig {
68        // Fixed sample count mode: per-bench samples override everything
69        if let Some(n) = bench.samples {
70            return ExecutionConfig {
71                warmup_time_ns: 0,
72                measurement_time_ns: 0,
73                min_iterations: Some(n),
74                max_iterations: Some(n),
75                ..self.clone()
76            };
77        }
78
79        ExecutionConfig {
80            warmup_time_ns: bench.warmup_ns.unwrap_or(self.warmup_time_ns),
81            measurement_time_ns: bench.measurement_ns.unwrap_or(self.measurement_time_ns),
82            min_iterations: bench.min_iterations.or(self.min_iterations),
83            max_iterations: bench.max_iterations.or(self.max_iterations),
84            ..self.clone()
85        }
86    }
87}
88
89impl Default for ExecutionConfig {
90    fn default() -> Self {
91        Self {
92            warmup_time_ns: 3_000_000_000,      // 3 seconds
93            measurement_time_ns: 5_000_000_000, // 5 seconds
94            min_iterations: Some(100),
95            max_iterations: None,
96            track_allocations: true,
97            bootstrap_iterations: 100_000, // Matches Criterion default
98            confidence_level: 0.95,
99        }
100    }
101}
102
103/// Result from executing a single benchmark
104#[derive(Debug)]
105pub struct BenchExecutionResult {
106    pub benchmark_id: String,
107    pub benchmark_name: String,
108    pub group: String,
109    pub file: String,
110    pub line: u32,
111    pub status: BenchmarkStatus,
112    pub samples: Vec<f64>,
113    /// CPU cycles per sample (parallel with samples)
114    pub cpu_cycles: Vec<u64>,
115    pub alloc_bytes: u64,
116    pub alloc_count: u64,
117    pub duration_ns: u64,
118    pub error_message: Option<String>,
119    pub failure_kind: Option<String>,
120    pub backtrace: Option<String>,
121    pub severity: fluxbench_core::Severity,
122    /// Per-benchmark regression threshold (0.0 = use global)
123    pub threshold: f64,
124}
125
126/// Execute benchmarks and produce results (in-process mode)
127pub struct Executor {
128    config: ExecutionConfig,
129    results: Vec<BenchExecutionResult>,
130}
131
132impl Executor {
133    /// Create a new in-process executor with the given configuration
134    pub fn new(config: ExecutionConfig) -> Self {
135        Self {
136            config,
137            results: Vec::new(),
138        }
139    }
140
141    /// Execute all provided benchmarks
142    pub fn execute(&mut self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
143        let pb = ProgressBar::new(benchmarks.len() as u64);
144        pb.set_style(
145            ProgressStyle::default_bar()
146                .template(
147                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
148                )
149                .unwrap_or_else(|_| ProgressStyle::default_bar())
150                .progress_chars("#>-"),
151        );
152
153        for bench in benchmarks {
154            pb.set_message(bench.id.to_string());
155            let result = self.execute_single(bench);
156            self.results.push(result);
157            pb.inc(1);
158        }
159
160        pb.finish_with_message("Complete");
161        std::mem::take(&mut self.results)
162    }
163
164    /// Execute a single benchmark
165    fn execute_single(&self, bench: &BenchmarkDef) -> BenchExecutionResult {
166        let start = Instant::now();
167        let cfg = self.config.resolve_for_benchmark(bench);
168
169        // Run with panic catching
170        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
171            let bencher = Bencher::new(cfg.track_allocations);
172
173            run_benchmark_loop(
174                bencher,
175                |b| (bench.runner_fn)(b),
176                cfg.warmup_time_ns,
177                cfg.measurement_time_ns,
178                cfg.min_iterations,
179                cfg.max_iterations,
180            )
181        }));
182
183        let duration_ns = start.elapsed().as_nanos() as u64;
184
185        match result {
186            Ok(bench_result) => {
187                // Extract timing samples as f64 for statistics
188                let samples: Vec<f64> = bench_result
189                    .samples
190                    .iter()
191                    .map(|s| s.duration_nanos as f64)
192                    .collect();
193
194                // Extract CPU cycles (parallel array with samples)
195                let cpu_cycles: Vec<u64> =
196                    bench_result.samples.iter().map(|s| s.cpu_cycles).collect();
197
198                // Sum allocations
199                let alloc_bytes: u64 = bench_result.samples.iter().map(|s| s.alloc_bytes).sum();
200                let alloc_count: u64 = bench_result
201                    .samples
202                    .iter()
203                    .map(|s| s.alloc_count as u64)
204                    .sum();
205
206                BenchExecutionResult {
207                    benchmark_id: bench.id.to_string(),
208                    benchmark_name: bench.name.to_string(),
209                    group: bench.group.to_string(),
210                    file: bench.file.to_string(),
211                    line: bench.line,
212                    status: BenchmarkStatus::Passed,
213                    samples,
214                    cpu_cycles,
215                    alloc_bytes,
216                    alloc_count,
217                    duration_ns,
218                    error_message: None,
219                    failure_kind: None,
220                    backtrace: None,
221                    severity: bench.severity,
222                    threshold: bench.threshold,
223                }
224            }
225            Err(panic) => {
226                let message = if let Some(s) = panic.downcast_ref::<&str>() {
227                    s.to_string()
228                } else if let Some(s) = panic.downcast_ref::<String>() {
229                    s.clone()
230                } else {
231                    "Unknown panic".to_string()
232                };
233
234                BenchExecutionResult {
235                    benchmark_id: bench.id.to_string(),
236                    benchmark_name: bench.name.to_string(),
237                    group: bench.group.to_string(),
238                    file: bench.file.to_string(),
239                    line: bench.line,
240                    status: BenchmarkStatus::Crashed,
241                    samples: Vec::new(),
242                    cpu_cycles: Vec::new(),
243                    alloc_bytes: 0,
244                    alloc_count: 0,
245                    duration_ns,
246                    error_message: Some(message),
247                    failure_kind: Some("panic".to_string()),
248                    backtrace: None,
249                    severity: bench.severity,
250                    threshold: bench.threshold,
251                }
252            }
253        }
254    }
255}
256
257/// Executor that runs benchmarks in isolated worker processes via IPC
258///
259/// This provides crash isolation - if a benchmark panics or crashes,
260/// it won't take down the supervisor process.
261pub struct IsolatedExecutor {
262    config: ExecutionConfig,
263    timeout: Duration,
264    reuse_workers: bool,
265    num_workers: usize,
266}
267
268impl IsolatedExecutor {
269    /// Create a new isolated executor
270    pub fn new(
271        config: ExecutionConfig,
272        timeout: Duration,
273        reuse_workers: bool,
274        num_workers: usize,
275    ) -> Self {
276        Self {
277            config,
278            timeout,
279            reuse_workers,
280            num_workers: num_workers.max(1),
281        }
282    }
283
284    /// Execute all provided benchmarks in isolated worker processes
285    pub fn execute(&self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
286        let pb = ProgressBar::new(benchmarks.len() as u64);
287        pb.set_style(
288            ProgressStyle::default_bar()
289                .template(
290                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
291                )
292                .unwrap_or_else(|_| ProgressStyle::default_bar())
293                .progress_chars("#>-"),
294        );
295        pb.set_message("Starting isolated workers...");
296
297        // Build per-benchmark IPC configs
298        let ipc_configs: Vec<BenchmarkConfig> = benchmarks
299            .iter()
300            .map(|bench| {
301                let cfg = self.config.resolve_for_benchmark(bench);
302                BenchmarkConfig {
303                    warmup_time_ns: cfg.warmup_time_ns,
304                    measurement_time_ns: cfg.measurement_time_ns,
305                    min_iterations: cfg.min_iterations,
306                    max_iterations: cfg.max_iterations,
307                    track_allocations: cfg.track_allocations,
308                    fail_on_allocation: false,
309                    timeout_ns: self.timeout.as_nanos() as u64,
310                }
311            })
312            .collect();
313
314        // Use the first config as default for the supervisor
315        let default_config = ipc_configs.first().cloned().unwrap_or(BenchmarkConfig {
316            warmup_time_ns: self.config.warmup_time_ns,
317            measurement_time_ns: self.config.measurement_time_ns,
318            min_iterations: self.config.min_iterations,
319            max_iterations: self.config.max_iterations,
320            track_allocations: self.config.track_allocations,
321            fail_on_allocation: false,
322            timeout_ns: self.timeout.as_nanos() as u64,
323        });
324
325        let supervisor = Supervisor::new(default_config, self.timeout, self.num_workers);
326
327        // Run benchmarks via IPC with per-benchmark configs
328        let ipc_results = if self.reuse_workers {
329            supervisor.run_with_reuse_configs(benchmarks, &ipc_configs)
330        } else {
331            supervisor.run_all_configs(benchmarks, &ipc_configs)
332        };
333
334        // Convert IPC results to BenchExecutionResult
335        let mut results = Vec::with_capacity(benchmarks.len());
336
337        match ipc_results {
338            Ok(ipc_results) => {
339                for (ipc_result, bench) in ipc_results.into_iter().zip(benchmarks.iter()) {
340                    pb.set_message(bench.id.to_string());
341                    results.push(self.convert_ipc_result(ipc_result, bench));
342                    pb.inc(1);
343                }
344            }
345            Err(e) => {
346                // Supervisor-level failure - mark all as crashed
347                for bench in benchmarks {
348                    results.push(BenchExecutionResult {
349                        benchmark_id: bench.id.to_string(),
350                        benchmark_name: bench.name.to_string(),
351                        group: bench.group.to_string(),
352                        file: bench.file.to_string(),
353                        line: bench.line,
354                        status: BenchmarkStatus::Crashed,
355                        samples: Vec::new(),
356                        cpu_cycles: Vec::new(),
357                        alloc_bytes: 0,
358                        alloc_count: 0,
359                        duration_ns: 0,
360                        error_message: Some(format!("Supervisor error: {}", e)),
361                        failure_kind: Some("crashed".to_string()),
362                        backtrace: None,
363                        severity: bench.severity,
364                        threshold: bench.threshold,
365                    });
366                    pb.inc(1);
367                }
368            }
369        }
370
371        pb.finish_with_message("Complete (isolated)");
372        results
373    }
374
375    /// Convert an IPC result to a BenchExecutionResult
376    fn convert_ipc_result(
377        &self,
378        ipc_result: IpcBenchmarkResult,
379        bench: &BenchmarkDef,
380    ) -> BenchExecutionResult {
381        let (status, error_message, failure_kind, backtrace) = match ipc_result.status {
382            IpcBenchmarkStatus::Success => (BenchmarkStatus::Passed, None, None, None),
383            IpcBenchmarkStatus::Failed {
384                message,
385                kind,
386                backtrace,
387            } => (
388                BenchmarkStatus::Failed,
389                Some(message),
390                Some(kind),
391                backtrace,
392            ),
393            IpcBenchmarkStatus::Crashed {
394                message,
395                kind,
396                backtrace,
397            } => (
398                BenchmarkStatus::Crashed,
399                Some(message),
400                Some(kind),
401                backtrace,
402            ),
403        };
404
405        // Extract timing samples as f64 for statistics
406        let samples: Vec<f64> = ipc_result
407            .samples
408            .iter()
409            .map(|s| s.duration_nanos as f64)
410            .collect();
411
412        // Extract CPU cycles
413        let cpu_cycles: Vec<u64> = ipc_result.samples.iter().map(|s| s.cpu_cycles).collect();
414
415        // Sum allocations
416        let alloc_bytes: u64 = ipc_result.samples.iter().map(|s| s.alloc_bytes).sum();
417        let alloc_count: u64 = ipc_result
418            .samples
419            .iter()
420            .map(|s| s.alloc_count as u64)
421            .sum();
422
423        BenchExecutionResult {
424            benchmark_id: bench.id.to_string(),
425            benchmark_name: bench.name.to_string(),
426            group: bench.group.to_string(),
427            file: bench.file.to_string(),
428            line: bench.line,
429            status,
430            samples,
431            cpu_cycles,
432            alloc_bytes,
433            alloc_count,
434            duration_ns: ipc_result.total_duration_nanos,
435            error_message,
436            failure_kind,
437            backtrace,
438            severity: bench.severity,
439            threshold: bench.threshold,
440        }
441    }
442}