use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::Result;
use chrono::Utc;
use super::{BenchResult, BenchRun};
static PINNED_SINGLE_CORE: AtomicBool = AtomicBool::new(false);
pub fn pinned_to_single_core() -> bool {
PINNED_SINGLE_CORE.load(Ordering::Relaxed)
}
pub fn single_core_pass_requested() -> bool {
let pin = std::env::var("NORNIR_BENCH_PIN").map(|v| !v.is_empty()).unwrap_or(false);
let st = std::env::var("NORNIR_BENCH_ST").map(|v| !v.is_empty() && v != "0").unwrap_or(false);
pin || st
}
pub trait Bencher: Sync {
fn id(&self) -> &'static str;
fn run(&self) -> Result<BenchResult>;
}
#[macro_export]
macro_rules! register_bench {
($expr:expr) => {
$crate::bench::api::inventory_submit! {
$crate::bench::api::BencherRegistration {
order: 0,
make: || {
let b: ::std::boxed::Box<dyn $crate::bench::api::Bencher> =
::std::boxed::Box::new($expr);
::std::boxed::Box::leak(b)
},
}
}
};
}
#[macro_export]
macro_rules! register_bench_ordered {
($order:expr, $expr:expr) => {
$crate::bench::api::inventory_submit! {
$crate::bench::api::BencherRegistration {
order: $order,
make: || {
let b: ::std::boxed::Box<dyn $crate::bench::api::Bencher> =
::std::boxed::Box::new($expr);
::std::boxed::Box::leak(b)
},
}
}
};
}
pub use inventory::submit as inventory_submit;
pub struct BencherRegistration {
pub order: i32,
pub make: fn() -> &'static dyn Bencher,
}
inventory::collect!(BencherRegistration);
pub fn run_main_json() -> Result<()> {
let mut regs: Vec<&'static BencherRegistration> =
inventory::iter::<BencherRegistration>().collect();
let mut resolved: Vec<(i32, &'static dyn Bencher)> =
regs.drain(..).map(|r| (r.order, (r.make)())).collect();
resolved.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.id().cmp(b.1.id())));
let stop_on_error = std::env::var("NORNIR_BENCH_STOP_ON_ERROR")
.map(|v| v != "0" && !v.is_empty())
.unwrap_or(false);
let machine_cores = num_cpus_best_effort() as u32;
let (mut results, mut tests) = run_pass(&resolved, stop_on_error);
if single_core_pass_requested() {
let (st_results, st_tests) = run_single_core_pass(&resolved, stop_on_error);
results.extend(st_results);
tests.extend(st_tests);
}
let now = Utc::now();
let run = BenchRun {
date: now.format("%Y-%m-%d").to_string(),
timestamp: Some(now.to_rfc3339()),
version: std::env::var("NORNIR_BENCH_VERSION")
.ok()
.filter(|v| !v.is_empty())
.unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string()),
machine: std::env::var("NORNIR_MACHINE").unwrap_or_default(),
cores: machine_cores,
results,
tests,
};
println!("{}", serde_json::to_string(&run)?);
Ok(())
}
fn num_cpus_best_effort() -> usize {
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)
}
fn summarize_metrics(r: &BenchResult) -> String {
for (k, v) in r.metrics.iter() {
let lk = k.to_ascii_lowercase();
if let Some(n) = v.as_f64() {
if lk.ends_with("ops_sec") || lk == "ops_per_sec" {
return format!("{} ops/s", trim_num(n));
}
if lk.ends_with("_mbs") || lk.ends_with("mbps") || lk == "mb_per_sec" {
return format!("{} MB/s", trim_num(n));
}
}
}
for (k, v) in r.metrics.iter() {
if let Some(n) = v.as_f64() {
return format!("{k}={}", trim_num(n));
}
}
String::new()
}
fn trim_num(n: f64) -> String {
if n.fract() == 0.0 && n.abs() < 1e15 {
format!("{}", n as i64)
} else {
format!("{n:.2}")
}
}
fn run_pass(
resolved: &[(i32, &'static dyn Bencher)],
stop_on_error: bool,
) -> (Vec<BenchResult>, Vec<super::TestOutcome>) {
use super::progress;
let mut results: Vec<BenchResult> = Vec::new();
let mut tests: Vec<super::TestOutcome> = Vec::new();
let single_core = pinned_to_single_core();
for (_order, b) in resolved {
let id = b.id().to_string();
progress::emit(&progress::starting_line(&id, single_core));
let sampler = super::telemetry::Sampler::start();
let start = std::time::Instant::now();
let outcome = b.run();
let elapsed = start.elapsed();
let telem = sampler.stop();
match outcome {
Ok(mut r) => {
super::telemetry::inject_into_metrics(&mut r.metrics, &telem);
progress::emit(&progress::done_line_with_telem(
&id,
single_core,
&summarize_metrics(&r),
elapsed,
telem.cores_busy_max,
telem.n_cores,
));
results.push(r);
tests.push(super::TestOutcome {
name: id,
passed: true,
duration_ms: Some(elapsed.as_secs_f64() * 1000.0),
message: None,
});
}
Err(e) => {
let msg = format!("{e:#}");
progress::emit(&progress::failed_line(&id, single_core, &msg, elapsed));
tests.push(super::TestOutcome {
name: id,
passed: false,
duration_ms: Some(elapsed.as_secs_f64() * 1000.0),
message: Some(msg),
});
if stop_on_error {
break;
}
}
}
}
(results, tests)
}
fn run_single_core_pass(
resolved: &[(i32, &'static dyn Bencher)],
stop_on_error: bool,
) -> (Vec<BenchResult>, Vec<super::TestOutcome>) {
unsafe { std::env::set_var("RAYON_NUM_THREADS", "1") };
let cpu = std::env::var("NORNIR_BENCH_PIN")
.ok()
.and_then(|v| v.trim().parse::<usize>().ok())
.filter(|&n| n < 64) .unwrap_or(0);
pin_to_cpu_best_effort(cpu);
PINNED_SINGLE_CORE.store(true, Ordering::Relaxed);
let (results, tests) = run_pass(resolved, stop_on_error);
PINNED_SINGLE_CORE.store(false, Ordering::Relaxed);
let results = results
.into_iter()
.map(|mut r| {
r.name = format!("{}_st", r.name);
r.metrics.insert("cores".into(), serde_json::json!(1));
r
})
.collect();
let tests = tests
.into_iter()
.map(|mut t| {
t.name = format!("{}_st", t.name);
t
})
.collect();
(results, tests)
}
fn pin_to_cpu_best_effort(cpu: usize) {
if std::env::var_os("NORNIR_BENCH_NO_AFFINITY").is_some() {
return;
}
#[cfg(target_os = "linux")]
{
if cpu >= 64 {
return; }
let mask: u64 = 1u64 << cpu;
unsafe {
libc_syscall::syscall3(
libc_syscall::SCHED_SETAFFINITY,
0,
std::mem::size_of::<u64>(),
(&mask as *const u64) as usize,
);
}
}
#[cfg(not(target_os = "linux"))]
{
let _ = cpu;
}
}
#[cfg(target_os = "linux")]
mod libc_syscall {
pub type Nr = i64;
#[cfg(target_arch = "x86_64")]
pub const SCHED_SETAFFINITY: Nr = 203;
#[cfg(target_arch = "aarch64")]
pub const SCHED_SETAFFINITY: Nr = 122;
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
pub const SCHED_SETAFFINITY: Nr = -1;
#[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))]
pub unsafe fn syscall3(nr: Nr, a: usize, b: usize, c: usize) -> isize {
let ret: isize;
#[cfg(target_arch = "x86_64")]
unsafe {
core::arch::asm!(
"syscall",
inlateout("rax") nr as isize => ret,
in("rdi") a,
in("rsi") b,
in("rdx") c,
lateout("rcx") _,
lateout("r11") _,
options(nostack, preserves_flags),
);
}
#[cfg(target_arch = "aarch64")]
unsafe {
core::arch::asm!(
"svc 0",
in("x8") nr as isize,
inlateout("x0") a => ret,
in("x1") b,
in("x2") c,
options(nostack, preserves_flags),
);
}
ret
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
pub unsafe fn syscall3(_nr: Nr, _a: usize, _b: usize, _c: usize) -> isize {
-1
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static ENV_GUARD: Mutex<()> = Mutex::new(());
struct Demo;
impl Bencher for Demo {
fn id(&self) -> &'static str { "demo.always_42" }
fn run(&self) -> Result<BenchResult> {
let mut metrics = serde_json::Map::new();
metrics.insert("answer".into(), serde_json::json!(42));
Ok(BenchResult { name: "demo".into(), metrics })
}
}
crate::register_bench!(Demo);
#[test]
fn registry_includes_demo() {
let ids: Vec<&'static str> =
inventory::iter::<BencherRegistration>().map(|r| (r.make)().id()).collect();
assert!(ids.contains(&"demo.always_42"), "registry missing demo: {ids:?}");
}
struct PinAware;
impl Bencher for PinAware {
fn id(&self) -> &'static str { "demo.pin_aware" }
fn run(&self) -> Result<BenchResult> {
let mut metrics = serde_json::Map::new();
metrics.insert("saw_pin".into(), serde_json::json!(pinned_to_single_core()));
metrics.insert("ops_sec".into(), serde_json::json!(100));
Ok(BenchResult { name: "pin_aware".into(), metrics })
}
}
#[test]
fn single_core_pass_relabels_and_sets_flag() {
let _g = ENV_GUARD.lock().unwrap();
unsafe { std::env::set_var("NORNIR_BENCH_NO_AFFINITY", "1") };
let b: &'static dyn Bencher = &PinAware;
let resolved: Vec<(i32, &'static dyn Bencher)> = vec![(0, b)];
let (normal, _) = run_pass(&resolved, false);
assert_eq!(normal.len(), 1);
assert_eq!(normal[0].name, "pin_aware");
assert_eq!(normal[0].metrics["saw_pin"], serde_json::json!(false));
let (st, st_tests) = run_single_core_pass(&resolved, false);
assert_eq!(st.len(), 1);
assert_eq!(st[0].name, "pin_aware_st");
assert_eq!(st[0].metrics["saw_pin"], serde_json::json!(true));
assert_eq!(st[0].metrics["cores"], serde_json::json!(1));
assert_eq!(st_tests[0].name, "demo.pin_aware_st");
assert!(st_tests[0].passed);
assert!(!pinned_to_single_core());
unsafe { std::env::remove_var("NORNIR_BENCH_NO_AFFINITY") };
}
#[test]
fn pass_request_env_parsing() {
let _g = ENV_GUARD.lock().unwrap();
unsafe {
std::env::remove_var("NORNIR_BENCH_PIN");
std::env::remove_var("NORNIR_BENCH_ST");
}
assert!(!single_core_pass_requested());
unsafe { std::env::set_var("NORNIR_BENCH_ST", "0") };
assert!(!single_core_pass_requested());
unsafe { std::env::set_var("NORNIR_BENCH_ST", "1") };
assert!(single_core_pass_requested());
unsafe {
std::env::remove_var("NORNIR_BENCH_ST");
std::env::set_var("NORNIR_BENCH_PIN", "0");
}
assert!(single_core_pass_requested());
unsafe { std::env::remove_var("NORNIR_BENCH_PIN") };
assert!(!single_core_pass_requested());
}
}