use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
pub struct PacerStats {
emulated_cycles: AtomicU64,
emulation_ns: AtomicU64,
spin_ns: AtomicU64,
behind_count: AtomicU64,
wall_ns: AtomicU64,
running: AtomicBool,
}
impl PacerStats {
pub fn new() -> Self {
Self {
emulated_cycles: AtomicU64::new(0),
emulation_ns: AtomicU64::new(0),
spin_ns: AtomicU64::new(0),
behind_count: AtomicU64::new(0),
wall_ns: AtomicU64::new(0),
running: AtomicBool::new(false),
}
}
pub fn snapshot(&self) -> PacerSnapshot {
PacerSnapshot {
emulated_cycles: self.emulated_cycles.load(Ordering::Relaxed),
emulation_ns: self.emulation_ns.load(Ordering::Relaxed),
spin_ns: self.spin_ns.load(Ordering::Relaxed),
behind_count: self.behind_count.load(Ordering::Relaxed),
wall_ns: self.wall_ns.load(Ordering::Relaxed),
}
}
pub(crate) fn add_emulated_cycles(&self, n: u64) {
self.emulated_cycles.fetch_add(n, Ordering::Relaxed);
}
pub(crate) fn add_emulation_ns(&self, n: u64) {
self.emulation_ns.fetch_add(n, Ordering::Relaxed);
}
pub(crate) fn add_spin_ns(&self, n: u64) {
self.spin_ns.fetch_add(n, Ordering::Relaxed);
}
pub(crate) fn increment_behind(&self) {
self.behind_count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn set_wall_ns(&self, ns: u64) {
self.wall_ns.store(ns, Ordering::Relaxed);
}
pub fn set_running(&self, val: bool) {
self.running.store(val, Ordering::Relaxed);
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
}
impl Default for PacerStats {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct PacerSnapshot {
pub emulated_cycles: u64,
pub emulation_ns: u64,
pub spin_ns: u64,
pub behind_count: u64,
pub wall_ns: u64,
}
impl PacerSnapshot {
pub fn utilization(&self) -> f64 {
if self.wall_ns == 0 {
return 0.0;
}
self.emulation_ns as f64 / self.wall_ns as f64
}
pub fn headroom(&self) -> f64 {
1.0 - self.utilization()
}
pub fn emulated_mhz(&self) -> f64 {
if self.wall_ns == 0 {
return 0.0;
}
self.emulated_cycles as f64 / self.wall_ns as f64 * 1000.0
}
}
#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn rdtsc() -> u64 {
unsafe { std::arch::x86_64::_rdtsc() }
}
#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn rdtscp() -> u64 {
let mut _aux = 0u32;
unsafe { std::arch::x86_64::__rdtscp(&mut _aux) }
}
#[cfg(target_arch = "x86_64")]
fn require_constant_tsc() {
let result = std::arch::x86_64::__cpuid(0x80000007);
let has_invariant_tsc = (result.edx >> 8) & 1 != 0;
assert!(
has_invariant_tsc,
"CPU does not support invariant TSC (constant_tsc). \
Required for rdtsc-based real-time pacing."
);
}
#[cfg(target_arch = "x86_64")]
fn calibrate_tsc() -> u64 {
require_constant_tsc();
let t0 = std::time::Instant::now();
let tsc0 = rdtscp();
std::thread::sleep(std::time::Duration::from_millis(50));
let tsc1 = rdtscp();
let elapsed_ns = t0.elapsed().as_nanos() as u64;
(tsc1 - tsc0) * 1_000_000_000 / elapsed_ns
}
#[cfg(target_arch = "x86_64")]
fn calibrate_overhead(nominal_quantum_tsc: u64, tsc_freq_hz: u64) -> u64 {
const BATCHES: usize = 5;
const PER_BATCH: u64 = 2000;
let tsc_to_ns =
|ticks: u64| -> u64 { (ticks as u128 * 1_000_000_000 / tsc_freq_hz as u128) as u64 };
let stats = PacerStats::new();
let mut min_overhead = u64::MAX;
for _ in 0..BATCHES {
let first = rdtscp();
let mut quantum_start = first;
for _ in 0..PER_BATCH {
let emu_end = rdtscp();
let emulation_tsc = emu_end - quantum_start;
let final_tsc = if emulation_tsc < nominal_quantum_tsc {
let target = quantum_start + nominal_quantum_tsc;
let mut now = emu_end;
while now < target {
std::hint::spin_loop();
now = rdtsc();
}
let total_tsc = now - quantum_start;
let spin_tsc = total_tsc - emulation_tsc;
stats.add_emulation_ns(tsc_to_ns(emulation_tsc));
stats.add_spin_ns(tsc_to_ns(spin_tsc));
now
} else {
stats.add_emulation_ns(tsc_to_ns(emulation_tsc));
stats.increment_behind();
emu_end
};
stats.add_emulated_cycles(150);
let wall_tsc = final_tsc - first;
stats.set_wall_ns(tsc_to_ns(wall_tsc));
quantum_start = rdtscp();
}
let elapsed = quantum_start - first;
let per_quantum = elapsed / PER_BATCH;
let overhead = per_quantum.saturating_sub(nominal_quantum_tsc);
if overhead < min_overhead {
min_overhead = overhead;
}
}
let max_overhead = nominal_quantum_tsc / 4;
min_overhead.min(max_overhead)
}
#[cfg(target_arch = "x86_64")]
pub struct Pacer {
stats: Arc<PacerStats>,
quantum_cycles: u64,
quantum_tsc_ticks: u64,
quantum_start_tsc: u64,
first_begin_tsc: Option<u64>,
tsc_freq_hz: u64,
sys_clk_hz: u64,
overhead: u64,
}
#[cfg(target_arch = "x86_64")]
impl Pacer {
pub fn new(sys_clk_hz: u32) -> Self {
Self::with_quantum(sys_clk_hz, 150)
}
pub fn with_quantum(sys_clk_hz: u32, quantum_cycles: u64) -> Self {
assert!(quantum_cycles > 0, "quantum_cycles must be non-zero");
let tsc_freq_hz = calibrate_tsc();
let nominal = (tsc_freq_hz as u128 * quantum_cycles as u128 / sys_clk_hz as u128) as u64;
assert!(
nominal >= 100,
"quantum too small for TSC resolution (nominal = {} ticks)",
nominal
);
let overhead = calibrate_overhead(nominal, tsc_freq_hz);
let quantum_tsc_ticks = nominal.saturating_sub(overhead);
assert!(quantum_tsc_ticks > 0, "quantum_tsc_ticks is zero");
Self {
stats: Arc::new(PacerStats::new()),
quantum_cycles,
quantum_tsc_ticks,
quantum_start_tsc: 0,
first_begin_tsc: None,
tsc_freq_hz,
sys_clk_hz: sys_clk_hz as u64,
overhead,
}
}
pub fn stats(&self) -> Arc<PacerStats> {
Arc::clone(&self.stats)
}
pub fn quantum_cycles(&self) -> u64 {
self.quantum_cycles
}
pub fn tsc_freq_hz(&self) -> u64 {
self.tsc_freq_hz
}
#[cfg(test)]
pub(crate) fn quantum_tsc_ticks(&self) -> u64 {
self.quantum_tsc_ticks
}
#[inline]
pub fn update_sys_clk_hz(&mut self, new_hz: u32) {
if new_hz == 0 {
return;
}
let new = new_hz as u64;
if new == self.sys_clk_hz {
return;
}
self.sys_clk_hz = new;
let nominal = (self.tsc_freq_hz as u128 * self.quantum_cycles as u128 / new as u128) as u64;
let effective_overhead = self.overhead.min(nominal / 4);
self.quantum_tsc_ticks = nominal.saturating_sub(effective_overhead).max(1);
}
#[inline(always)]
pub fn begin_quantum(&mut self) {
let tsc = rdtscp();
self.first_begin_tsc.get_or_insert(tsc);
self.quantum_start_tsc = tsc;
}
#[inline(always)]
pub fn end_quantum(&mut self) {
debug_assert!(
self.quantum_start_tsc != 0,
"begin_quantum() must be called before end_quantum()"
);
let emu_end = rdtscp();
let emulation_tsc = emu_end - self.quantum_start_tsc;
let final_tsc = if emulation_tsc < self.quantum_tsc_ticks {
let target_tsc = self.quantum_start_tsc + self.quantum_tsc_ticks;
let mut now = emu_end;
while now < target_tsc {
std::hint::spin_loop();
now = rdtsc();
}
let total_tsc = now - self.quantum_start_tsc;
let spin_tsc = total_tsc - emulation_tsc;
self.stats.add_emulation_ns(self.tsc_to_ns(emulation_tsc));
self.stats.add_spin_ns(self.tsc_to_ns(spin_tsc));
now
} else {
self.stats.add_emulation_ns(self.tsc_to_ns(emulation_tsc));
self.stats.increment_behind();
emu_end
};
let first = self
.first_begin_tsc
.expect("begin_quantum() must be called before end_quantum()");
let wall_tsc = final_tsc - first;
self.stats.set_wall_ns(self.tsc_to_ns(wall_tsc));
self.stats.add_emulated_cycles(self.quantum_cycles);
}
#[inline(always)]
fn tsc_to_ns(&self, tsc_ticks: u64) -> u64 {
(tsc_ticks as u128 * 1_000_000_000 / self.tsc_freq_hz as u128) as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pacer_stats_new() {
let stats = PacerStats::new();
let snap = stats.snapshot();
assert_eq!(snap.emulated_cycles, 0);
assert_eq!(snap.emulation_ns, 0);
assert_eq!(snap.spin_ns, 0);
assert_eq!(snap.behind_count, 0);
assert_eq!(snap.wall_ns, 0);
assert!(!stats.is_running());
}
#[test]
fn test_pacer_stats_add_cycles() {
let stats = PacerStats::new();
stats.add_emulated_cycles(100);
stats.add_emulated_cycles(50);
assert_eq!(stats.snapshot().emulated_cycles, 150);
}
#[test]
fn test_pacer_stats_snapshot() {
let stats = PacerStats::new();
stats.add_emulated_cycles(1000);
stats.add_emulation_ns(500);
stats.add_spin_ns(300);
stats.increment_behind();
stats.increment_behind();
let snap = stats.snapshot();
assert_eq!(snap.emulated_cycles, 1000);
assert_eq!(snap.emulation_ns, 500);
assert_eq!(snap.spin_ns, 300);
assert_eq!(snap.behind_count, 2);
assert_eq!(snap.wall_ns, 0);
}
#[test]
fn test_pacer_stats_running() {
let stats = PacerStats::new();
assert!(!stats.is_running());
stats.set_running(true);
assert!(stats.is_running());
stats.set_running(false);
assert!(!stats.is_running());
}
#[test]
fn test_snapshot_utilization_zero() {
let snap = PacerSnapshot {
emulated_cycles: 0,
emulation_ns: 0,
spin_ns: 0,
behind_count: 0,
wall_ns: 0,
};
assert_eq!(snap.utilization(), 0.0);
}
#[test]
fn test_snapshot_utilization_half() {
let snap = PacerSnapshot {
emulated_cycles: 0,
emulation_ns: 500,
spin_ns: 500,
behind_count: 0,
wall_ns: 1000,
};
assert!((snap.utilization() - 0.5).abs() < f64::EPSILON);
}
#[test]
fn test_snapshot_utilization_full() {
let snap = PacerSnapshot {
emulated_cycles: 0,
emulation_ns: 1000,
spin_ns: 0,
behind_count: 0,
wall_ns: 1000,
};
assert!((snap.utilization() - 1.0).abs() < f64::EPSILON);
}
#[test]
fn test_snapshot_headroom() {
let snap = PacerSnapshot {
emulated_cycles: 0,
emulation_ns: 300,
spin_ns: 700,
behind_count: 0,
wall_ns: 1000,
};
assert!((snap.headroom() - 0.7).abs() < f64::EPSILON);
}
#[test]
fn test_snapshot_emulated_mhz() {
let snap = PacerSnapshot {
emulated_cycles: 150_000,
emulation_ns: 500_000,
spin_ns: 500_000,
behind_count: 0,
wall_ns: 1_000_000,
};
assert!((snap.emulated_mhz() - 150.0).abs() < f64::EPSILON);
}
#[test]
fn test_snapshot_emulated_mhz_zero() {
let snap = PacerSnapshot {
emulated_cycles: 100,
emulation_ns: 0,
spin_ns: 0,
behind_count: 0,
wall_ns: 0,
};
assert_eq!(snap.emulated_mhz(), 0.0);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_creation() {
let pacer = Pacer::new(150_000_000);
assert_eq!(pacer.quantum_cycles(), 150);
assert!(pacer.tsc_freq_hz() > 0, "TSC frequency must be non-zero");
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_with_quantum() {
let pacer = Pacer::with_quantum(150_000_000, 300);
assert_eq!(pacer.quantum_cycles(), 300);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_stats_shared() {
let pacer = Pacer::new(150_000_000);
let stats1 = pacer.stats();
let stats2 = pacer.stats();
assert!(Arc::ptr_eq(&stats1, &stats2));
stats1.add_emulated_cycles(42);
assert_eq!(stats2.snapshot().emulated_cycles, 42);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_begin_end_quantum() {
let mut pacer = Pacer::new(150_000_000);
pacer.begin_quantum();
let mut dummy = 0u64;
for i in 0..1000 {
dummy = std::hint::black_box(dummy.wrapping_add(i));
}
std::hint::black_box(dummy);
pacer.end_quantum();
let snap = pacer.stats().snapshot();
assert!(snap.emulation_ns > 0, "emulation_ns should be non-zero");
assert_eq!(snap.emulated_cycles, 150);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_behind_detection() {
let mut pacer = Pacer::new(u32::MAX);
pacer.begin_quantum();
let mut dummy = 0u64;
for i in 0..10_000 {
dummy = std::hint::black_box(dummy.wrapping_add(i));
}
std::hint::black_box(dummy);
pacer.end_quantum();
let snap = pacer.stats().snapshot();
assert!(
snap.behind_count > 0,
"should detect being behind real-time"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_tsc_to_ns_via_pacer() {
let mut pacer = Pacer::new(150_000_000);
pacer.begin_quantum();
pacer.end_quantum();
let snap = pacer.stats().snapshot();
let total = snap.emulation_ns + snap.spin_ns;
assert!(total > 0, "total ns should be non-zero after a quantum");
assert!(
total < 1_000_000_000,
"a single quantum should take < 1 second"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_wall_ns_after_quantum() {
let mut pacer = Pacer::new(150_000_000);
pacer.begin_quantum();
pacer.end_quantum();
let snap = pacer.stats().snapshot();
assert!(
snap.wall_ns > 0,
"wall_ns should be non-zero after a quantum"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_pacer_wall_ns_monotonic() {
let mut pacer = Pacer::new(150_000_000);
let mut last = 0u64;
for _ in 0..5 {
pacer.begin_quantum();
pacer.end_quantum();
let wall = pacer.stats().snapshot().wall_ns;
assert!(
wall > last,
"wall_ns should grow each quantum: {} > {}",
wall,
last
);
last = wall;
}
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_calibrate_overhead_clamped_tiny_nominal() {
let overhead = calibrate_overhead(200, 3_000_000_000);
assert!(overhead <= 50, "clamp should limit overhead to nominal/4");
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_calibrate_overhead_reasonable() {
let overhead = calibrate_overhead(3000, 3_000_000_000);
assert!(overhead < 3000, "overhead should be below nominal");
assert!(overhead <= 750, "overhead should be clamped to nominal/4");
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_update_sys_clk_hz_zero_keeps_previous() {
let mut pacer = Pacer::new(6_500_000);
let before = pacer.quantum_tsc_ticks();
pacer.update_sys_clk_hz(0);
assert_eq!(
pacer.quantum_tsc_ticks(),
before,
"zero-Hz update must preserve previous quantum_tsc_ticks"
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_update_sys_clk_hz_changes_quantum() {
let mut pacer = Pacer::new(6_500_000);
let old_ticks = pacer.quantum_tsc_ticks();
assert!(old_ticks > 0, "baseline quantum must be non-zero");
pacer.update_sys_clk_hz(150_000_000);
let new_ticks = pacer.quantum_tsc_ticks();
assert!(new_ticks > 0, "new quantum must be non-zero");
assert!(
new_ticks < old_ticks,
"150 MHz quantum should be smaller than 6.5 MHz quantum (old={}, new={})",
old_ticks,
new_ticks
);
let ratio = old_ticks as f64 / new_ticks as f64;
assert!(
(10.0..=100.0).contains(&ratio),
"ratio should be ~23x (overhead-biased), got {:.2} (old={}, new={})",
ratio,
old_ticks,
new_ticks
);
}
#[test]
#[cfg(target_arch = "x86_64")]
fn test_update_sys_clk_hz_noop_when_same() {
let mut pacer = Pacer::new(6_500_000);
let before = pacer.quantum_tsc_ticks();
pacer.update_sys_clk_hz(6_500_000);
pacer.update_sys_clk_hz(6_500_000);
assert_eq!(
pacer.quantum_tsc_ticks(),
before,
"same-frequency update must be a no-op"
);
}
}