use crate::supervisor::{IpcBenchmarkResult, IpcBenchmarkStatus, Supervisor};
use fluxbench_core::{Bencher, BenchmarkDef, run_benchmark_loop};
use fluxbench_ipc::BenchmarkConfig;
use fluxbench_report::BenchmarkStatus;
use indicatif::{ProgressBar, ProgressStyle};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct ExecutionConfig {
pub warmup_time_ns: u64,
pub measurement_time_ns: u64,
pub min_iterations: Option<u64>,
pub max_iterations: Option<u64>,
pub track_allocations: bool,
pub bootstrap_iterations: usize,
pub confidence_level: f64,
}
impl ExecutionConfig {
pub fn resolve_for_benchmark(&self, bench: &BenchmarkDef) -> ExecutionConfig {
if let Some(n) = bench.samples {
return ExecutionConfig {
warmup_time_ns: 0,
measurement_time_ns: 0,
min_iterations: Some(n),
max_iterations: Some(n),
..self.clone()
};
}
ExecutionConfig {
warmup_time_ns: bench.warmup_ns.unwrap_or(self.warmup_time_ns),
measurement_time_ns: bench.measurement_ns.unwrap_or(self.measurement_time_ns),
min_iterations: bench.min_iterations.or(self.min_iterations),
max_iterations: bench.max_iterations.or(self.max_iterations),
..self.clone()
}
}
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
warmup_time_ns: 3_000_000_000, measurement_time_ns: 5_000_000_000, min_iterations: Some(100),
max_iterations: None,
track_allocations: true,
bootstrap_iterations: 100_000, confidence_level: 0.95,
}
}
}
#[derive(Debug)]
pub struct BenchExecutionResult {
pub benchmark_id: String,
pub benchmark_name: String,
pub group: String,
pub file: String,
pub line: u32,
pub status: BenchmarkStatus,
pub samples: Vec<f64>,
pub cpu_cycles: Vec<u64>,
pub alloc_bytes: u64,
pub alloc_count: u64,
pub duration_ns: u64,
pub error_message: Option<String>,
pub failure_kind: Option<String>,
pub backtrace: Option<String>,
pub severity: fluxbench_core::Severity,
pub threshold: f64,
}
pub struct Executor {
config: ExecutionConfig,
results: Vec<BenchExecutionResult>,
}
impl Executor {
pub fn new(config: ExecutionConfig) -> Self {
Self {
config,
results: Vec::new(),
}
}
pub fn execute(&mut self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
let pb = ProgressBar::new(benchmarks.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
)
.unwrap_or_else(|_| ProgressStyle::default_bar())
.progress_chars("#>-"),
);
for bench in benchmarks {
pb.set_message(bench.id.to_string());
let result = self.execute_single(bench);
self.results.push(result);
pb.inc(1);
}
pb.finish_with_message("Complete");
std::mem::take(&mut self.results)
}
fn execute_single(&self, bench: &BenchmarkDef) -> BenchExecutionResult {
let start = Instant::now();
let cfg = self.config.resolve_for_benchmark(bench);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let bencher = Bencher::new(cfg.track_allocations);
run_benchmark_loop(
bencher,
|b| (bench.runner_fn)(b),
cfg.warmup_time_ns,
cfg.measurement_time_ns,
cfg.min_iterations,
cfg.max_iterations,
)
}));
let duration_ns = start.elapsed().as_nanos() as u64;
match result {
Ok(bench_result) => {
let samples: Vec<f64> = bench_result
.samples
.iter()
.map(|s| s.duration_nanos as f64)
.collect();
let cpu_cycles: Vec<u64> =
bench_result.samples.iter().map(|s| s.cpu_cycles).collect();
let alloc_bytes: u64 = bench_result.samples.iter().map(|s| s.alloc_bytes).sum();
let alloc_count: u64 = bench_result
.samples
.iter()
.map(|s| s.alloc_count as u64)
.sum();
BenchExecutionResult {
benchmark_id: bench.id.to_string(),
benchmark_name: bench.name.to_string(),
group: bench.group.to_string(),
file: bench.file.to_string(),
line: bench.line,
status: BenchmarkStatus::Passed,
samples,
cpu_cycles,
alloc_bytes,
alloc_count,
duration_ns,
error_message: None,
failure_kind: None,
backtrace: None,
severity: bench.severity,
threshold: bench.threshold,
}
}
Err(panic) => {
let message = if let Some(s) = panic.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic.downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic".to_string()
};
BenchExecutionResult {
benchmark_id: bench.id.to_string(),
benchmark_name: bench.name.to_string(),
group: bench.group.to_string(),
file: bench.file.to_string(),
line: bench.line,
status: BenchmarkStatus::Crashed,
samples: Vec::new(),
cpu_cycles: Vec::new(),
alloc_bytes: 0,
alloc_count: 0,
duration_ns,
error_message: Some(message),
failure_kind: Some("panic".to_string()),
backtrace: None,
severity: bench.severity,
threshold: bench.threshold,
}
}
}
}
}
pub struct IsolatedExecutor {
config: ExecutionConfig,
timeout: Duration,
reuse_workers: bool,
num_workers: usize,
}
impl IsolatedExecutor {
pub fn new(
config: ExecutionConfig,
timeout: Duration,
reuse_workers: bool,
num_workers: usize,
) -> Self {
Self {
config,
timeout,
reuse_workers,
num_workers: num_workers.max(1),
}
}
pub fn execute(&self, benchmarks: &[&BenchmarkDef]) -> Vec<BenchExecutionResult> {
let pb = ProgressBar::new(benchmarks.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
)
.unwrap_or_else(|_| ProgressStyle::default_bar())
.progress_chars("#>-"),
);
pb.set_message("Starting isolated workers...");
let ipc_configs: Vec<BenchmarkConfig> = benchmarks
.iter()
.map(|bench| {
let cfg = self.config.resolve_for_benchmark(bench);
BenchmarkConfig {
warmup_time_ns: cfg.warmup_time_ns,
measurement_time_ns: cfg.measurement_time_ns,
min_iterations: cfg.min_iterations,
max_iterations: cfg.max_iterations,
track_allocations: cfg.track_allocations,
fail_on_allocation: false,
timeout_ns: self.timeout.as_nanos() as u64,
}
})
.collect();
let default_config = ipc_configs.first().cloned().unwrap_or(BenchmarkConfig {
warmup_time_ns: self.config.warmup_time_ns,
measurement_time_ns: self.config.measurement_time_ns,
min_iterations: self.config.min_iterations,
max_iterations: self.config.max_iterations,
track_allocations: self.config.track_allocations,
fail_on_allocation: false,
timeout_ns: self.timeout.as_nanos() as u64,
});
let supervisor = Supervisor::new(default_config, self.timeout, self.num_workers);
let ipc_results = if self.reuse_workers {
supervisor.run_with_reuse_configs(benchmarks, &ipc_configs)
} else {
supervisor.run_all_configs(benchmarks, &ipc_configs)
};
let mut results = Vec::with_capacity(benchmarks.len());
match ipc_results {
Ok(ipc_results) => {
for (ipc_result, bench) in ipc_results.into_iter().zip(benchmarks.iter()) {
pb.set_message(bench.id.to_string());
results.push(self.convert_ipc_result(ipc_result, bench));
pb.inc(1);
}
}
Err(e) => {
for bench in benchmarks {
results.push(BenchExecutionResult {
benchmark_id: bench.id.to_string(),
benchmark_name: bench.name.to_string(),
group: bench.group.to_string(),
file: bench.file.to_string(),
line: bench.line,
status: BenchmarkStatus::Crashed,
samples: Vec::new(),
cpu_cycles: Vec::new(),
alloc_bytes: 0,
alloc_count: 0,
duration_ns: 0,
error_message: Some(format!("Supervisor error: {}", e)),
failure_kind: Some("crashed".to_string()),
backtrace: None,
severity: bench.severity,
threshold: bench.threshold,
});
pb.inc(1);
}
}
}
pb.finish_with_message("Complete (isolated)");
results
}
fn convert_ipc_result(
&self,
ipc_result: IpcBenchmarkResult,
bench: &BenchmarkDef,
) -> BenchExecutionResult {
let (status, error_message, failure_kind, backtrace) = match ipc_result.status {
IpcBenchmarkStatus::Success => (BenchmarkStatus::Passed, None, None, None),
IpcBenchmarkStatus::Failed {
message,
kind,
backtrace,
} => (
BenchmarkStatus::Failed,
Some(message),
Some(kind),
backtrace,
),
IpcBenchmarkStatus::Crashed {
message,
kind,
backtrace,
} => (
BenchmarkStatus::Crashed,
Some(message),
Some(kind),
backtrace,
),
};
let samples: Vec<f64> = ipc_result
.samples
.iter()
.map(|s| s.duration_nanos as f64)
.collect();
let cpu_cycles: Vec<u64> = ipc_result.samples.iter().map(|s| s.cpu_cycles).collect();
let alloc_bytes: u64 = ipc_result.samples.iter().map(|s| s.alloc_bytes).sum();
let alloc_count: u64 = ipc_result
.samples
.iter()
.map(|s| s.alloc_count as u64)
.sum();
BenchExecutionResult {
benchmark_id: bench.id.to_string(),
benchmark_name: bench.name.to_string(),
group: bench.group.to_string(),
file: bench.file.to_string(),
line: bench.line,
status,
samples,
cpu_cycles,
alloc_bytes,
alloc_count,
duration_ns: ipc_result.total_duration_nanos,
error_message,
failure_kind,
backtrace,
severity: bench.severity,
threshold: bench.threshold,
}
}
}