use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use log::{debug, info};
use quanta::Clock;
#[repr(align(64))]
struct PaddedAtomicBool {
val: AtomicBool,
_pad: [u8; 63],
}
impl PaddedAtomicBool {
fn new(v: bool) -> Self {
Self {
val: AtomicBool::new(v),
_pad: [0u8; 63],
}
}
}
struct SharedState {
barrier: Barrier,
flag: PaddedAtomicBool,
}
const PING: bool = false;
const PONG: bool = true;
pub struct EtdConfig {
pub iterations: u32,
pub samples: u32,
pub warmup: u32,
pub max_stddev: f64,
}
impl Default for EtdConfig {
fn default() -> Self {
Self {
iterations: 500,
samples: 50,
warmup: 200,
max_stddev: 15.0,
}
}
}
fn measure_pair(cpu_a: usize, cpu_b: usize, config: &EtdConfig) -> Option<Vec<f64>> {
let state = Arc::new(SharedState {
barrier: Barrier::new(2),
flag: PaddedAtomicBool::new(PING),
});
let clock = Arc::new(Clock::new());
let num_round_trips = config.iterations as usize;
let num_samples = config.samples as usize;
let warmup_trips = config.warmup as usize;
let state_pong = Arc::clone(&state);
let state_ping = Arc::clone(&state);
let clock_ping = Arc::clone(&clock);
crossbeam_utils::thread::scope(|s| {
let pong = s.spawn(move |_| {
let core_id = core_affinity::CoreId { id: cpu_b };
if !core_affinity::set_for_current(core_id) {
return;
}
unsafe {
#[cfg(target_env = "gnu")]
let param = libc::sched_param { sched_priority: 99 };
#[cfg(target_env = "musl")]
let param = libc::sched_param {
sched_priority: 99,
sched_ss_low_priority: 0,
sched_ss_repl_period: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_init_budget: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_max_repl: 0,
};
libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m);
}
state_pong.barrier.wait();
for _ in 0..warmup_trips {
while state_pong
.flag
.val
.compare_exchange(PING, PONG, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
std::hint::spin_loop();
}
}
for _ in 0..(num_round_trips * num_samples) {
while state_pong
.flag
.val
.compare_exchange(PING, PONG, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
std::hint::spin_loop();
}
}
unsafe {
#[cfg(target_env = "gnu")]
let param = libc::sched_param { sched_priority: 0 };
#[cfg(target_env = "musl")]
let param = libc::sched_param {
sched_priority: 0,
sched_ss_low_priority: 0,
sched_ss_repl_period: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_init_budget: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_max_repl: 0,
};
libc::sched_setscheduler(0, libc::SCHED_OTHER, ¶m);
}
});
let ping = s.spawn(move |_| {
let core_id = core_affinity::CoreId { id: cpu_a };
if !core_affinity::set_for_current(core_id) {
return None;
}
unsafe {
#[cfg(target_env = "gnu")]
let param = libc::sched_param { sched_priority: 99 };
#[cfg(target_env = "musl")]
let param = libc::sched_param {
sched_priority: 99,
sched_ss_low_priority: 0,
sched_ss_repl_period: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_init_budget: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_max_repl: 0,
};
libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m);
}
let mut results = Vec::with_capacity(num_samples);
state_ping.barrier.wait();
for _ in 0..warmup_trips {
while state_ping
.flag
.val
.compare_exchange(PONG, PING, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
std::hint::spin_loop();
}
}
for _ in 0..num_samples {
let start = clock_ping.raw();
for _ in 0..num_round_trips {
while state_ping
.flag
.val
.compare_exchange(PONG, PING, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
std::hint::spin_loop();
}
}
let end = clock_ping.raw();
let duration_ns = clock_ping.delta(start, end).as_nanos() as f64;
results.push(duration_ns / (num_round_trips as f64 * 2.0));
}
unsafe {
#[cfg(target_env = "gnu")]
let param = libc::sched_param { sched_priority: 0 };
#[cfg(target_env = "musl")]
let param = libc::sched_param {
sched_priority: 0,
sched_ss_low_priority: 0,
sched_ss_repl_period: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_init_budget: libc::timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_max_repl: 0,
};
libc::sched_setscheduler(0, libc::SCHED_OTHER, ¶m);
}
Some(results)
});
pong.join().unwrap();
ping.join().unwrap()
})
.ok()?
}
pub fn calibrate_full_matrix<F>(
nr_cpus: usize,
config: &EtdConfig,
mut progress_callback: F,
) -> Vec<Vec<f64>>
where
F: FnMut(usize, usize, bool),
{
let mut matrix = vec![vec![0.0; nr_cpus]; nr_cpus];
info!(
"ETD: Starting calibration for {} CPUs ({} iterations × {} samples)",
nr_cpus, config.iterations, config.samples
);
let start = std::time::Instant::now();
let total_pairs = (nr_cpus * (nr_cpus - 1)) / 2;
let mut current_pair = 0;
#[allow(clippy::needless_range_loop)]
for cpu_a in 0..nr_cpus {
for cpu_b in (cpu_a + 1)..nr_cpus {
current_pair += 1;
let mut retry_count = 0;
const MAX_RETRIES: u32 = 3;
while let Some(samples) = measure_pair(cpu_a, cpu_b, config) {
if !samples.is_empty() {
let n = samples.len() as f64;
let mean = samples.iter().sum::<f64>() / n;
let variance = samples.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
let stddev = variance.sqrt();
if stddev <= config.max_stddev || retry_count >= MAX_RETRIES {
let mut sorted = samples;
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
let median = sorted[sorted.len() / 2];
matrix[cpu_a][cpu_b] = median;
matrix[cpu_b][cpu_a] = median;
if stddev > config.max_stddev {
debug!(
"ETD: CPU {}<->{} stddev={:.1}ns (exceeded threshold after {} retries)",
cpu_a, cpu_b, stddev, retry_count
);
}
progress_callback(current_pair, total_pairs, false);
break;
} else {
retry_count += 1;
debug!(
"ETD: CPU {}<->{} stddev={:.1}ns > {:.1}ns, retrying ({}/{})",
cpu_a, cpu_b, stddev, config.max_stddev, retry_count, MAX_RETRIES
);
}
} else {
break; }
}
}
}
progress_callback(total_pairs, total_pairs, true);
let elapsed = start.elapsed();
info!("ETD: Calibration complete in {:.2}s", elapsed.as_secs_f64());
debug!("ETD: Latency matrix (ns):");
for (i, row) in matrix.iter().enumerate() {
debug!(
" CPU {:2}: {:?}",
i,
row.iter().map(|v| format!("{:.1}", v)).collect::<Vec<_>>()
);
}
matrix
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_measure_pair_smoke() {
let config = EtdConfig {
iterations: 100,
samples: 2,
warmup: 10,
max_stddev: 100.0,
};
let result = measure_pair(0, 1, &config);
if let Some(latencies) = result {
for latency in &latencies {
assert!(*latency > 0.0, "Latency should be positive");
assert!(
*latency < 1_000_000.0,
"Latency should be reasonable (<1ms)"
);
}
}
}
}