use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "async")]
use tokio::time::sleep;
#[cfg(not(feature = "async"))]
use std::thread::sleep;
#[cfg(feature = "async")]
use tokio::sync::RwLock; #[cfg(not(feature = "async"))]
use std::sync::RwLock;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use log::*;
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
use core::arch::x86_64::_rdtsc;
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
fn get_time() -> u64 {
unsafe { _rdtsc() }
}
#[cfg(not(all(feature = "tsc", target_os = "linux", target_arch = "x86_64")))]
use std::time::Instant;
pub struct HighPrecisionClock {
#[cfg(not(all(feature = "tsc", target_os = "linux", target_arch = "x86_64")))]
base_instant: Instant,
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
base_tsc: u64,
base_datetime: DateTime<Utc>,
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
cpu_frequency_hz: f64,
warning_threshold_ns: i64,
}
impl HighPrecisionClock {
pub fn new(warning_threshold_ns: i64, calibration_int: Duration) -> Arc<RwLock<Self>> {
let clock = Self::initialize_clock(warning_threshold_ns);
let clock = Arc::new(RwLock::new(clock));
let clock_clone = Arc::clone(&clock);
if let Some(invariant_tsc) = Self::check_invariant_tsc() {
if !invariant_tsc {
error!("TSC is NOT invariant while TSC mode is enabled");
}
}
#[cfg(feature = "async")]
tokio::spawn(async move {
info!("Starting time drift correction (async)");
Self::periodic_drift_correction(calibration_int, clock_clone).await;
});
#[cfg(not(feature = "async"))]
std::thread::spawn(move || {
info!("Starting time drift correction (sync)");
Self::periodic_drift_correction(calibration_int, clock_clone);
});
clock
}
fn check_invariant_tsc() -> Option<bool> {
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
{
if let Ok(cpuinfo) = std::fs::read_to_string("/proc/cpuinfo") {
return Some(cpuinfo.contains("constant_tsc") && cpuinfo.contains("nonstop_tsc"));
}
return Some(false);
}
None
}
fn initialize_clock(warning_threshold_ns: i64) -> Self {
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
{
let base_datetime = Utc::now();
let base_tsc = get_time();
let cpu_frequency_hz = Self::get_cpu_frequency();
Self {
base_tsc,
base_datetime,
cpu_frequency_hz,
warning_threshold_ns,
}
}
#[cfg(not(all(feature = "tsc", target_os = "linux", target_arch = "x86_64")))]
{
let base_datetime = Utc::now();
let base_instant = Instant::now();
Self {
base_instant,
base_datetime,
warning_threshold_ns,
}
}
}
#[allow(dead_code)]
fn get_cpu_frequency() -> f64 {
let cpuinfo = std::fs::read_to_string("/proc/cpuinfo")
.expect("Failed to read /proc/cpuinfo");
for line in cpuinfo.lines() {
if line.starts_with("cpu MHz") {
if let Some(freq_str) = line.split(':').nth(1) {
let freq_mhz: f64 = freq_str.trim().parse().expect("Failed to parse CPU frequency");
info!("The CPU frequency is: {} MHz", freq_mhz);
return freq_mhz * 1_000_000.0;
}
}
}
panic!("Could not determine CPU frequency from /proc/cpuinfo");
}
pub fn now(&self) -> DateTime<Utc> {
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
{
let current_tsc = get_time();
let elapsed_cycles = current_tsc - self.base_tsc;
let elapsed_ns = (elapsed_cycles as f64 / self.cpu_frequency_hz) * 1_000_000_000.0;
self.base_datetime + ChronoDuration::nanoseconds(elapsed_ns as i64)
}
#[cfg(not(all(feature = "tsc", target_os = "linux", target_arch = "x86_64")))]
{
let elapsed = self.base_instant.elapsed();
let elapsed_ns = elapsed.as_nanos() as i64;
self.base_datetime + ChronoDuration::nanoseconds(elapsed_ns)
}
}
fn reset_baselines(&mut self, desc: &str) {
let sys_time = Utc::now();
let drift = sys_time - self.now();
self.base_datetime = sys_time;
#[cfg(not(all(feature = "tsc", target_os = "linux", target_arch = "x86_64")))]
{
self.base_instant = Instant::now();
}
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
{
self.base_tsc = get_time();
}
let drift_ns = drift.num_nanoseconds().unwrap_or_default().abs();
if drift_ns >= self.warning_threshold_ns {
warn!("Significant time drift detected ({}): {:?}(ns)", desc, drift_ns);
} else {
trace!("Time drift detected ({}): {:?}(ns)", desc, drift_ns);
}
}
#[cfg(feature = "async")]
async fn periodic_drift_correction(calibration_int: Duration, clock: Arc<RwLock<Self>>) {
loop {
sleep(calibration_int).await;
let mut clock = clock.write().await;
clock.reset_baselines("async");
}
}
#[cfg(not(feature = "async"))]
fn periodic_drift_correction(calibration_int: Duration, clock: Arc<RwLock<Self>>) {
loop {
sleep(calibration_int);
let mut clock = match clock.write() {
Ok(clock) => clock,
Err(e) => {
error!("Failed to acquire clock lock during calibration: {:?}", e);
continue;
}
};
clock.reset_baselines("sync");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
use std::time::Duration;
#[test]
fn test_clock_initialization() {
let clock = HighPrecisionClock::new(10_000, Duration::from_secs(1)); let _ = clock.read().unwrap().now(); }
#[test]
fn test_now_close_to_system_time() {
let clock = HighPrecisionClock::new(10_000, Duration::from_secs(1));
let clock_time = clock.read().unwrap().now();
let system_time = Utc::now();
let drift = system_time - clock_time;
let drift_ns = drift.num_nanoseconds().unwrap_or_default().abs();
assert!(drift_ns < 10_000_000, "Drift too large: {} ns", drift_ns);
}
#[test]
fn test_drift_correction() {
let clock = HighPrecisionClock::new(10_000, Duration::from_secs(1));
let mut clock_lock = clock.write().unwrap();
clock_lock.base_datetime = clock_lock.base_datetime - ChronoDuration::milliseconds(1); drop(clock_lock);
let drift_before = (Utc::now() - clock.read().unwrap().now()).num_nanoseconds().unwrap_or_default().abs();
let mut clock_lock = clock.write().unwrap();
clock_lock.reset_baselines("test");
drop(clock_lock);
let drift_after = (Utc::now() - clock.read().unwrap().now()).num_nanoseconds().unwrap_or_default().abs();
assert!(drift_after < drift_before, "Drift correction failed");
}
#[test]
fn test_check_invariant_tsc() {
#[cfg(all(feature = "tsc", target_os = "linux", target_arch = "x86_64"))]
{
let invariant_tsc = HighPrecisionClock::check_invariant_tsc();
assert!(invariant_tsc.is_some(), "TSC check should be available on x86_64 Linux");
}
}
}