use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::evidence_sink::EvidenceSink;
use crate::lab::chaos::ChaosRng;
use crate::time::TimeSource;
use crate::types::Time;
use franken_evidence::EvidenceLedger;
type SkewNanos = i64;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct ComputedSkew {
total_skew: SkewNanos,
jitter_applied: bool,
jump_fired_now: bool,
}
#[derive(Debug, Clone)]
pub struct ClockSkewConfig {
pub seed: u64,
pub static_offset_ns: SkewNanos,
pub drift_rate_ns_per_sec: SkewNanos,
pub jitter_probability: f64,
pub jitter_max_ns: u64,
pub jump: Option<(u64, SkewNanos)>,
}
impl ClockSkewConfig {
#[must_use]
pub const fn new(seed: u64) -> Self {
Self {
seed,
static_offset_ns: 0,
drift_rate_ns_per_sec: 0,
jitter_probability: 0.0,
jitter_max_ns: 0,
jump: None,
}
}
#[must_use]
pub const fn with_static_offset_ms(mut self, ms: i64) -> Self {
self.static_offset_ns = ms.saturating_mul(1_000_000);
self
}
#[must_use]
pub const fn with_static_offset_ns(mut self, ns: SkewNanos) -> Self {
self.static_offset_ns = ns;
self
}
#[must_use]
pub const fn with_drift_rate(mut self, ns_per_sec: SkewNanos) -> Self {
self.drift_rate_ns_per_sec = ns_per_sec;
self
}
#[must_use]
pub fn with_jitter(mut self, probability: f64, max_ns: u64) -> Self {
assert!(
(0.0..=1.0).contains(&probability),
"probability must be in [0.0, 1.0], got {probability}"
);
self.jitter_probability = probability;
self.jitter_max_ns = max_ns;
self
}
#[must_use]
pub const fn with_jump(mut self, trigger_after_ns: u64, jump_offset_ns: SkewNanos) -> Self {
self.jump = Some((trigger_after_ns, jump_offset_ns));
self
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.static_offset_ns != 0
|| self.drift_rate_ns_per_sec != 0
|| self.jitter_probability > 0.0
|| self.jump.is_some()
}
}
pub struct ClockSkewStats {
reads: AtomicU64,
skewed_reads: AtomicU64,
jitter_count: AtomicU64,
jump_fired: AtomicU64,
max_abs_skew_ns: AtomicU64,
}
impl ClockSkewStats {
fn new() -> Self {
Self {
reads: AtomicU64::new(0),
skewed_reads: AtomicU64::new(0),
jitter_count: AtomicU64::new(0),
jump_fired: AtomicU64::new(0),
max_abs_skew_ns: AtomicU64::new(0),
}
}
fn record_read(&self, abs_skew: u64) {
self.reads.fetch_add(1, Ordering::Relaxed);
if abs_skew > 0 {
self.skewed_reads.fetch_add(1, Ordering::Relaxed);
}
let mut current = self.max_abs_skew_ns.load(Ordering::Relaxed);
while abs_skew > current {
match self.max_abs_skew_ns.compare_exchange_weak(
current,
abs_skew,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
fn record_jitter(&self) {
self.jitter_count.fetch_add(1, Ordering::Relaxed);
}
fn record_jump(&self) {
self.jump_fired.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> ClockSkewStatsSnapshot {
ClockSkewStatsSnapshot {
reads: self.reads.load(Ordering::Relaxed),
skewed_reads: self.skewed_reads.load(Ordering::Relaxed),
jitter_count: self.jitter_count.load(Ordering::Relaxed),
jump_fired: self.jump_fired.load(Ordering::Relaxed) > 0,
max_abs_skew_ns: self.max_abs_skew_ns.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClockSkewStatsSnapshot {
pub reads: u64,
pub skewed_reads: u64,
pub jitter_count: u64,
pub jump_fired: bool,
pub max_abs_skew_ns: u64,
}
pub struct SkewClock {
base: Arc<dyn TimeSource>,
config: ClockSkewConfig,
rng: Mutex<ChaosRng>,
stats: ClockSkewStats,
jump_fired: AtomicU64,
evidence_sink: Arc<dyn EvidenceSink>,
}
impl std::fmt::Debug for SkewClock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let snap = self.stats.snapshot();
f.debug_struct("SkewClock")
.field("static_offset_ns", &self.config.static_offset_ns)
.field("drift_rate_ns_per_sec", &self.config.drift_rate_ns_per_sec)
.field("reads", &snap.reads)
.field("skewed_reads", &snap.skewed_reads)
.finish_non_exhaustive()
}
}
impl SkewClock {
#[must_use]
pub fn new(
base: Arc<dyn TimeSource>,
config: ClockSkewConfig,
evidence_sink: Arc<dyn EvidenceSink>,
) -> Self {
let rng = ChaosRng::new(config.seed);
Self {
base,
rng: Mutex::new(rng),
stats: ClockSkewStats::new(),
jump_fired: AtomicU64::new(0),
evidence_sink,
config,
}
}
#[must_use]
pub fn stats(&self) -> ClockSkewStatsSnapshot {
self.stats.snapshot()
}
fn compute_drift_ns(base_nanos: u64, drift_rate_ns_per_sec: SkewNanos) -> SkewNanos {
let product = i128::from(drift_rate_ns_per_sec).saturating_mul(i128::from(base_nanos));
let drift = product / 1_000_000_000_i128;
if drift > i128::from(i64::MAX) {
i64::MAX
} else if drift < i128::from(i64::MIN) {
i64::MIN
} else {
#[allow(clippy::cast_possible_truncation)]
{
drift as SkewNanos
}
}
}
fn compute_skew(&self, base_nanos: u64) -> ComputedSkew {
let mut total_skew: SkewNanos = self.config.static_offset_ns;
let mut jump_fired_now = false;
let mut jitter_applied = false;
if self.config.drift_rate_ns_per_sec != 0 {
let drift = Self::compute_drift_ns(base_nanos, self.config.drift_rate_ns_per_sec);
total_skew = total_skew.saturating_add(drift);
}
if let Some((trigger_ns, jump_offset)) = self.config.jump {
if base_nanos >= trigger_ns
&& self
.jump_fired
.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.stats.record_jump();
jump_fired_now = true;
}
if self.jump_fired.load(Ordering::Relaxed) > 0 {
total_skew = total_skew.saturating_add(jump_offset);
}
}
if self.config.jitter_probability > 0.0 && self.config.jitter_max_ns > 0 {
let (should, magnitude, direction) = {
let mut rng = self.rng.lock();
let should = rng.should_inject(self.config.jitter_probability);
let mag = (rng.next_u64() % self.config.jitter_max_ns).saturating_add(1);
let dir = rng.next_u64().is_multiple_of(2);
drop(rng);
(should, mag, dir)
};
if should {
let sign: SkewNanos = if direction { 1 } else { -1 };
#[allow(clippy::cast_possible_wrap)]
let jitter = sign * (magnitude as SkewNanos);
total_skew = total_skew.saturating_add(jitter);
jitter_applied = true;
self.stats.record_jitter();
}
}
ComputedSkew {
total_skew,
jitter_applied,
jump_fired_now,
}
}
fn apply_offset(base_nanos: u64, offset: SkewNanos) -> u64 {
if offset >= 0 {
base_nanos.saturating_add(offset.unsigned_abs())
} else {
base_nanos.saturating_sub(offset.unsigned_abs())
}
}
}
impl TimeSource for SkewClock {
fn now(&self) -> Time {
let base = self.base.now();
let base_nanos = base.as_nanos();
let skew = self.compute_skew(base_nanos);
let skewed_nanos = Self::apply_offset(base_nanos, skew.total_skew);
self.stats.record_read(skew.total_skew.unsigned_abs());
if skew.total_skew != 0 {
let action = if skew.jump_fired_now {
"clock_jump"
} else if skew.jitter_applied {
"clock_jitter"
} else {
"clock_skew"
};
emit_skew_evidence(
&self.evidence_sink,
base.as_millis(),
action,
base_nanos,
skew.total_skew,
);
}
Time::from_nanos(skewed_nanos)
}
}
#[must_use]
pub fn skew_clock(
base: Arc<dyn TimeSource>,
config: ClockSkewConfig,
evidence_sink: Arc<dyn EvidenceSink>,
) -> Arc<SkewClock> {
Arc::new(SkewClock::new(base, config, evidence_sink))
}
#[allow(clippy::cast_precision_loss)]
fn emit_skew_evidence(
sink: &Arc<dyn EvidenceSink>,
ts_unix_ms: u64,
action: &str,
base_ns: u64,
offset_ns: i64,
) {
let entry = EvidenceLedger {
ts_unix_ms,
component: "clock_skew_injector".to_string(),
action: format!("inject_{action}"),
posterior: vec![1.0],
expected_loss_by_action: std::collections::BTreeMap::from([(
format!("inject_{action}"),
0.0,
)]),
chosen_expected_loss: 0.0,
calibration_score: 1.0,
fallback_active: false,
top_features: vec![
("base_time_ns".to_string(), base_ns as f64),
("offset_ns".to_string(), offset_ns as f64),
],
};
sink.emit(&entry);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::evidence_sink::CollectorSink;
use crate::time::VirtualClock;
fn make_base_clock() -> Arc<VirtualClock> {
Arc::new(VirtualClock::new())
}
fn make_sink() -> (Arc<CollectorSink>, Arc<dyn EvidenceSink>) {
let collector = Arc::new(CollectorSink::new());
let sink: Arc<dyn EvidenceSink> = collector.clone();
(collector, sink)
}
#[test]
fn no_skew_passthrough() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000); assert_eq!(skewed.now(), Time::from_secs(1));
let stats = skewed.stats();
assert_eq!(stats.reads, 1);
assert_eq!(stats.skewed_reads, 0);
}
#[test]
fn static_offset_ahead() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_static_offset_ms(50); let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000); let t = skewed.now();
assert_eq!(t, Time::from_nanos(1_050_000_000));
let stats = skewed.stats();
assert_eq!(stats.reads, 1);
assert_eq!(stats.skewed_reads, 1);
assert_eq!(stats.max_abs_skew_ns, 50_000_000);
}
#[test]
fn jump_evidence_uses_base_clock_timestamp() {
let base = make_base_clock();
let (collector, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_jump(1_000_000_000, 50_000_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000);
let _ = skewed.now();
let entries = collector.entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].action, "inject_clock_jump");
assert_eq!(entries[0].ts_unix_ms, 1_000);
}
#[test]
fn static_offset_emits_skew_evidence() {
let base = make_base_clock();
let (collector, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_static_offset_ms(25);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000);
let _ = skewed.now();
let entries = collector.entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].action, "inject_clock_skew");
assert_eq!(entries[0].ts_unix_ms, 1_000);
}
#[test]
fn static_offset_behind() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_static_offset_ms(-100); let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000); let t = skewed.now();
assert_eq!(t, Time::from_nanos(900_000_000));
}
#[test]
fn static_offset_saturates_at_zero() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_static_offset_ms(-200);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(100_000_000); let t = skewed.now();
assert_eq!(t, Time::ZERO); }
#[test]
fn drift_makes_clock_run_fast() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_drift_rate(1_000_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(10_000_000_000);
let t = skewed.now();
assert_eq!(t, Time::from_nanos(10_010_000_000));
}
#[test]
fn drift_makes_clock_run_slow() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_drift_rate(-2_000_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(5_000_000_000);
let t = skewed.now();
assert_eq!(t, Time::from_nanos(4_990_000_000));
}
#[test]
fn jump_fires_once_at_trigger() {
let base = make_base_clock();
let (collector, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_jump(2_000_000_000, 100_000_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000);
assert_eq!(skewed.now(), Time::from_secs(1));
base.advance(1_500_000_000); let t = skewed.now();
assert_eq!(t, Time::from_nanos(2_600_000_000));
base.advance(1_000_000_000); let t2 = skewed.now();
assert_eq!(t2, Time::from_nanos(3_600_000_000));
let stats = skewed.stats();
assert!(stats.jump_fired);
let entries = collector.entries();
assert!(
entries.iter().any(|e| e.action.contains("clock_jump")),
"Expected evidence for clock_jump"
);
}
#[test]
fn jump_backward_simulates_ntp_correction() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_jump(1_000_000_000, -50_000_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(2_000_000_000); let t = skewed.now();
assert_eq!(t, Time::from_nanos(1_950_000_000)); }
#[test]
fn jitter_is_deterministic() {
let base1 = make_base_clock();
let base2 = make_base_clock();
let (_, sink1) = make_sink();
let (_, sink2) = make_sink();
let config = ClockSkewConfig::new(42).with_jitter(1.0, 10_000_000);
let skewed1 = SkewClock::new(base1.clone() as Arc<dyn TimeSource>, config.clone(), sink1);
let skewed2 = SkewClock::new(base2.clone() as Arc<dyn TimeSource>, config, sink2);
let mut times1 = Vec::new();
let mut times2 = Vec::new();
for i in 1..=20 {
base1.set(Time::from_secs(i));
base2.set(Time::from_secs(i));
times1.push(skewed1.now());
times2.push(skewed2.now());
}
assert_eq!(
times1, times2,
"Same seed must produce same jitter sequence"
);
}
#[test]
fn one_nanosecond_jitter_does_not_collapse_to_zero() {
let base = make_base_clock();
let (collector, sink) = make_sink();
let config = ClockSkewConfig::new(7).with_jitter(1.0, 1);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.set(Time::from_secs(3));
let observed = skewed.now();
let diff = observed.as_nanos().abs_diff(Time::from_secs(3).as_nanos());
assert_eq!(diff, 1);
let stats = skewed.stats();
assert_eq!(stats.jitter_count, 1);
assert_eq!(stats.skewed_reads, 1);
let entries = collector.entries();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].action, "inject_clock_jitter");
}
#[test]
fn jitter_bounded() {
let base = make_base_clock();
let (_, sink) = make_sink();
let max_jitter_ns: u64 = 5_000_000; let config = ClockSkewConfig::new(42).with_jitter(1.0, max_jitter_ns);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(10_000_000_000);
for _ in 0..100 {
let t = skewed.now();
let diff = if t.as_nanos() >= 10_000_000_000 {
t.as_nanos() - 10_000_000_000
} else {
10_000_000_000 - t.as_nanos()
};
assert!(
diff < max_jitter_ns,
"Jitter {diff}ns exceeds max {max_jitter_ns}ns"
);
}
let stats = skewed.stats();
assert_eq!(stats.jitter_count, 100);
}
#[test]
fn combined_offset_and_drift() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42)
.with_static_offset_ms(10) .with_drift_rate(500_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(20_000_000_000);
let t = skewed.now();
assert_eq!(t, Time::from_nanos(20_020_000_000));
}
#[test]
fn stats_track_reads() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_static_offset_ms(1);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
base.advance(1_000_000_000);
for _ in 0..10 {
let _ = skewed.now();
}
let stats = skewed.stats();
assert_eq!(stats.reads, 10);
assert_eq!(stats.skewed_reads, 10);
}
#[test]
fn config_default_disabled() {
let config = ClockSkewConfig::new(42);
assert!(!config.is_enabled());
}
#[test]
fn config_static_offset_enabled() {
let config = ClockSkewConfig::new(42).with_static_offset_ms(1);
assert!(config.is_enabled());
}
#[test]
fn config_drift_enabled() {
let config = ClockSkewConfig::new(42).with_drift_rate(1);
assert!(config.is_enabled());
}
#[test]
fn config_jitter_enabled() {
let config = ClockSkewConfig::new(42).with_jitter(0.5, 1000);
assert!(config.is_enabled());
}
#[test]
#[should_panic(expected = "probability must be in [0.0, 1.0]")]
fn config_rejects_invalid_jitter_probability() {
let _ = ClockSkewConfig::new(42).with_jitter(1.5, 1000);
}
#[test]
fn zero_base_time_with_offset() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_static_offset_ms(100);
let skewed = SkewClock::new(base as Arc<dyn TimeSource>, config, sink);
let t = skewed.now();
assert_eq!(t, Time::from_millis(100));
}
#[test]
fn clock_skew_stats_snapshot_debug_clone_eq() {
let snap = ClockSkewStatsSnapshot {
reads: 100,
skewed_reads: 50,
jitter_count: 10,
jump_fired: false,
max_abs_skew_ns: 5000,
};
let cloned = snap.clone();
assert_eq!(cloned, snap);
let dbg = format!("{snap:?}");
assert!(dbg.contains("ClockSkewStatsSnapshot"));
assert_ne!(
snap,
ClockSkewStatsSnapshot {
reads: 0,
skewed_reads: 0,
jitter_count: 0,
jump_fired: false,
max_abs_skew_ns: 0,
}
);
}
#[test]
fn clock_skew_config_debug_clone() {
let config = ClockSkewConfig::new(42)
.with_static_offset_ms(1)
.with_drift_rate(500);
let cloned = config.clone();
assert_eq!(cloned.seed, 42);
assert_eq!(cloned.static_offset_ns, 1_000_000);
assert_eq!(cloned.drift_rate_ns_per_sec, 500);
let dbg = format!("{config:?}");
assert!(dbg.contains("ClockSkewConfig"));
}
#[test]
fn drift_precision_is_stable_at_large_timestamps() {
let base = make_base_clock();
let (_, sink) = make_sink();
let config = ClockSkewConfig::new(42).with_drift_rate(1_000_000_000);
let skewed = SkewClock::new(base.clone() as Arc<dyn TimeSource>, config, sink);
let base_nanos = 9_007_199_254_740_993_u64;
base.set(Time::from_nanos(base_nanos));
let actual = skewed.now();
let expected = Time::from_nanos(base_nanos.saturating_mul(2));
assert_eq!(actual, expected);
}
}