use crate::combinator::hedge::HedgeConfig;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
fn duration_nanos_saturating_u64(duration: Duration) -> u64 {
let nanos = duration.as_nanos();
if nanos > u128::from(u64::MAX) {
u64::MAX
} else {
nanos as u64
}
}
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
fn decay_nanos(current: u64, decay_factor: f64) -> u64 {
let decayed = (current as f64) * decay_factor;
if !decayed.is_finite() || decayed <= 0.0 {
0
} else if decayed >= (u64::MAX as f64) {
u64::MAX
} else {
decayed as u64
}
}
#[derive(Debug)]
pub struct PeakEwmaHedgeController {
estimate_nanos: AtomicU64,
min_delay: u64,
max_delay: u64,
decay_factor: f64,
}
impl PeakEwmaHedgeController {
#[must_use]
pub fn new(
initial: Duration,
min_delay: Duration,
max_delay: Duration,
decay_factor: f64,
) -> Self {
assert!(
decay_factor.is_finite() && decay_factor > 0.0 && decay_factor <= 1.0,
"decay_factor must be finite and in (0, 1]"
);
let min_nanos = duration_nanos_saturating_u64(min_delay);
let max_nanos = duration_nanos_saturating_u64(max_delay);
assert!(min_nanos <= max_nanos, "min_delay must be <= max_delay");
let initial_nanos = duration_nanos_saturating_u64(initial).clamp(min_nanos, max_nanos);
Self {
estimate_nanos: AtomicU64::new(initial_nanos),
min_delay: min_nanos,
max_delay: max_nanos,
decay_factor,
}
}
#[must_use]
pub fn default_rpc() -> Self {
Self::new(
Duration::from_millis(50), Duration::from_millis(10), Duration::from_millis(500), 0.99, )
}
pub fn observe(&self, rtt: Duration) {
let sample = duration_nanos_saturating_u64(rtt);
let mut current = self.estimate_nanos.load(Ordering::Acquire);
loop {
let decayed = decay_nanos(current, self.decay_factor);
let next = sample.max(decayed).clamp(self.min_delay, self.max_delay);
match self.estimate_nanos.compare_exchange_weak(
current,
next,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(updated) => current = updated,
}
}
}
#[must_use]
pub fn current_config(&self) -> HedgeConfig {
let mut delay_nanos = self.estimate_nanos.load(Ordering::Relaxed);
if delay_nanos < self.min_delay {
delay_nanos = self.min_delay;
} else if delay_nanos > self.max_delay {
delay_nanos = self.max_delay;
}
HedgeConfig::new(Duration::from_nanos(delay_nanos))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use std::thread;
fn raw_estimate_nanos(controller: &PeakEwmaHedgeController) -> u64 {
controller.estimate_nanos.load(Ordering::Relaxed)
}
#[test]
fn adaptive_hedge_tracks_peak_instantly() {
let controller = PeakEwmaHedgeController::default_rpc();
assert_eq!(
controller.current_config().hedge_delay,
Duration::from_millis(50)
);
controller.observe(Duration::from_millis(200));
assert_eq!(
controller.current_config().hedge_delay,
Duration::from_millis(200)
);
}
#[test]
fn adaptive_hedge_decays_slowly() {
let controller = PeakEwmaHedgeController::default_rpc();
controller.observe(Duration::from_millis(200));
controller.observe(Duration::from_millis(10));
let delay = controller.current_config().hedge_delay.as_millis();
assert_eq!(delay, 198);
}
#[test]
fn adaptive_hedge_respects_bounds() {
let controller = PeakEwmaHedgeController::default_rpc();
controller.observe(Duration::from_secs(1)); assert_eq!(
controller.current_config().hedge_delay,
Duration::from_millis(500)
);
assert_eq!(
raw_estimate_nanos(&controller),
duration_nanos_saturating_u64(Duration::from_millis(500))
);
}
#[test]
fn adaptive_hedge_uses_peak_ewma_max_equation() {
let controller = PeakEwmaHedgeController::new(
Duration::from_millis(200),
Duration::from_millis(1),
Duration::from_secs(1),
0.99,
);
controller.observe(Duration::from_millis(199));
assert_eq!(
controller.current_config().hedge_delay,
Duration::from_millis(199)
);
}
#[test]
fn adaptive_hedge_clamps_initial_delay() {
let controller = PeakEwmaHedgeController::new(
Duration::from_secs(1),
Duration::from_millis(10),
Duration::from_millis(50),
0.99,
);
assert_eq!(
controller.current_config().hedge_delay,
Duration::from_millis(50)
);
}
#[test]
#[should_panic(expected = "min_delay must be <= max_delay")]
fn adaptive_hedge_rejects_inverted_bounds() {
let _ = PeakEwmaHedgeController::new(
Duration::from_millis(20),
Duration::from_millis(50),
Duration::from_millis(10),
0.99,
);
}
#[test]
#[should_panic(expected = "decay_factor must be finite and in (0, 1]")]
fn adaptive_hedge_rejects_invalid_decay_factor() {
let _ = PeakEwmaHedgeController::new(
Duration::from_millis(20),
Duration::from_millis(10),
Duration::from_millis(50),
1.5,
);
}
#[test]
fn adaptive_hedge_saturates_huge_duration_samples() {
let controller = PeakEwmaHedgeController::new(
Duration::from_millis(20),
Duration::from_millis(10),
Duration::from_millis(500),
0.99,
);
controller.observe(Duration::from_secs(u64::MAX));
assert_eq!(
controller.current_config().hedge_delay,
Duration::from_millis(500)
);
}
#[test]
fn adaptive_hedge_clamps_raw_atomic_state_at_upper_bound() {
let controller = PeakEwmaHedgeController::new(
Duration::from_millis(20),
Duration::from_millis(10),
Duration::from_millis(50),
0.99,
);
controller.observe(Duration::from_secs(1));
assert_eq!(
raw_estimate_nanos(&controller),
duration_nanos_saturating_u64(Duration::from_millis(50))
);
}
#[test]
fn adaptive_hedge_clamps_raw_atomic_state_at_lower_bound_after_decay() {
let controller = PeakEwmaHedgeController::new(
Duration::from_millis(20),
Duration::from_millis(10),
Duration::from_millis(50),
0.5,
);
for _ in 0..8 {
controller.observe(Duration::ZERO);
}
assert_eq!(
raw_estimate_nanos(&controller),
duration_nanos_saturating_u64(Duration::from_millis(10))
);
}
#[test]
fn adaptive_hedge_keeps_atomic_state_bounded_under_multithreaded_observe_contention() {
let controller = Arc::new(PeakEwmaHedgeController::new(
Duration::from_millis(20),
Duration::from_millis(10),
Duration::from_millis(50),
0.95,
));
let start = Arc::new(Barrier::new(5));
let samples = [
Duration::ZERO,
Duration::from_millis(5),
Duration::from_millis(25),
Duration::from_millis(200),
];
let handles: Vec<_> = (0..4)
.map(|thread_idx| {
let controller = Arc::clone(&controller);
let start = Arc::clone(&start);
thread::spawn(move || {
start.wait();
for round in 0..512 {
let sample = samples[(thread_idx + round) % samples.len()];
controller.observe(sample);
let raw = raw_estimate_nanos(&controller);
assert!(
raw >= controller.min_delay && raw <= controller.max_delay,
"raw estimate {raw} escaped bounds [{}..={}]",
controller.min_delay,
controller.max_delay
);
}
})
})
.collect();
start.wait();
for handle in handles {
handle
.join()
.expect("observe() contention thread should not panic");
}
let final_raw = raw_estimate_nanos(&controller);
assert!(
final_raw >= controller.min_delay && final_raw <= controller.max_delay,
"final raw estimate {final_raw} escaped bounds [{}..={}]",
controller.min_delay,
controller.max_delay
);
}
#[test]
fn metamorphic_replay_with_readonly_probes_is_stable() {
let trace = [
Duration::from_millis(80),
Duration::from_millis(15),
Duration::from_millis(220),
Duration::from_millis(5),
Duration::from_millis(120),
Duration::ZERO,
];
let baseline = PeakEwmaHedgeController::new(
Duration::from_millis(50),
Duration::from_millis(10),
Duration::from_millis(500),
0.95,
);
let mut baseline_delays = Vec::with_capacity(trace.len());
for sample in trace {
baseline.observe(sample);
baseline_delays.push(baseline.current_config().hedge_delay);
}
let replayed = PeakEwmaHedgeController::new(
Duration::from_millis(50),
Duration::from_millis(10),
Duration::from_millis(500),
0.95,
);
let mut replayed_delays = Vec::with_capacity(trace.len());
for sample in trace {
let pre_probe = replayed.current_config();
assert!(
pre_probe.hedge_delay >= Duration::from_millis(10)
&& pre_probe.hedge_delay <= Duration::from_millis(500),
"readonly probe must stay within configured bounds"
);
replayed.observe(sample);
let post_probe = replayed.current_config();
replayed_delays.push(post_probe.hedge_delay);
}
assert_eq!(
replayed_delays, baseline_delays,
"interleaving readonly config probes must not perturb replayed hedge trajectory"
);
assert_eq!(
raw_estimate_nanos(&replayed),
raw_estimate_nanos(&baseline),
"readonly probes must not perturb the final atomic hedge estimate"
);
}
}