use std::hash::{BuildHasher, Hash};
use std::time::Duration;
use crate::clock;
use crate::config::TimeoutConfig;
use crate::tracker::LatencyTracker;
#[derive(Default, Clone)]
pub struct AdaptiveTimeout {
config: TimeoutConfig,
}
impl AdaptiveTimeout {
pub fn new(config: TimeoutConfig) -> Self {
Self { config }
}
#[inline]
pub fn select_timeout<'a, D, I, H, const N: usize>(
&self,
tracker: &mut LatencyTracker<D, I, H, N>,
destinations: impl IntoIterator<Item = &'a D>,
attempt: u32,
now: I,
) -> Duration
where
D: Hash + Eq + Clone + 'a,
I: clock::Instant,
H: BuildHasher,
{
Duration::from_millis(self.select_timeout_ms(tracker, destinations, attempt, now))
}
pub fn select_timeout_ms<'a, D, I, H, const N: usize>(
&self,
tracker: &mut LatencyTracker<D, I, H, N>,
destinations: impl IntoIterator<Item = &'a D>,
attempt: u32,
now: I,
) -> u64
where
D: Hash + Eq + Clone + 'a,
I: clock::Instant,
H: BuildHasher,
{
let multiplier = Self::attempt_multiplier(attempt);
let floor = self.config.backoff.min_ms.get() as u64;
let ceiling = self.config.backoff.max_ms.get() as u64;
let fallback = (floor * multiplier).min(ceiling);
let mut selected = fallback;
for dest in destinations.into_iter() {
if let Some(estimate_ms) = tracker.quantile_ms(dest, self.config.quantile, now) {
let adaptive_ms = self.compute_adaptive_ms(estimate_ms, multiplier);
let clamped = adaptive_ms.max(floor).min(ceiling);
selected = selected.max(clamped);
}
}
selected
}
#[inline]
pub fn exponential_backoff(&self, attempt: u32) -> Duration {
Duration::from_millis(self.exponential_backoff_ms(attempt))
}
#[inline]
pub fn exponential_backoff_ms(&self, attempt: u32) -> u64 {
let multiplier = Self::attempt_multiplier(attempt);
let base = self.config.backoff.min_ms.get() as u64;
let ceiling = self.config.backoff.max_ms.get() as u64;
(base * multiplier).min(ceiling)
}
#[inline]
fn attempt_multiplier(attempt: u32) -> u64 {
let exponent = attempt.saturating_sub(1).min(20);
1u64 << exponent
}
#[inline]
fn compute_adaptive_ms(&self, estimate_ms: u64, multiplier: u64) -> u64 {
let base = estimate_ms.saturating_mul(multiplier);
(self.config.safety_factor * base as f64) as u64
}
#[inline]
pub fn config(&self) -> &TimeoutConfig {
&self.config
}
#[cfg(feature = "sync")]
#[inline]
pub fn select_timeout_sync<'a, D, I, H, const N: usize>(
&self,
tracker: &crate::sync_tracker::SyncLatencyTracker<D, I, H, N>,
destinations: impl IntoIterator<Item = &'a D>,
attempt: u32,
now: I,
) -> Duration
where
D: Hash + Eq + Clone + Send + Sync + 'a,
I: clock::Instant,
H: BuildHasher + Clone,
{
Duration::from_millis(self.select_timeout_sync_ms(tracker, destinations, attempt, now))
}
#[cfg(feature = "sync")]
pub fn select_timeout_sync_ms<'a, D, I, H, const N: usize>(
&self,
tracker: &crate::sync_tracker::SyncLatencyTracker<D, I, H, N>,
destinations: impl IntoIterator<Item = &'a D>,
attempt: u32,
now: I,
) -> u64
where
D: Hash + Eq + Clone + Send + Sync + 'a,
I: clock::Instant,
H: BuildHasher + Clone,
{
let multiplier = Self::attempt_multiplier(attempt);
let floor = self.config.backoff.min_ms.get() as u64;
let ceiling = self.config.backoff.max_ms.get() as u64;
let fallback = (floor * multiplier).min(ceiling);
let mut selected = fallback;
for dest in destinations.into_iter() {
if let Some(estimate_ms) = tracker.quantile_ms(dest, self.config.quantile, now) {
let adaptive_ms = self.compute_adaptive_ms(estimate_ms, multiplier);
let clamped = adaptive_ms.max(floor).min(ceiling);
selected = selected.max(clamped);
}
}
selected
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::*;
use crate::config::TrackerConfig;
use crate::parse::BackoffInterval;
fn make_tracker_and_timeout<I: clock::Instant>() -> (LatencyTracker<u32, I>, AdaptiveTimeout) {
let tracker_config = TrackerConfig {
min_samples: 5,
..TrackerConfig::default()
};
let timeout_config = TimeoutConfig {
backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
quantile: 0.99,
safety_factor: 2.0,
};
(
LatencyTracker::new(tracker_config),
AdaptiveTimeout::new(timeout_config),
)
}
#[test]
fn fallback_exponential_backoff_no_data() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
let t1 = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
assert_eq!(t1, Duration::from_millis(10));
let t2 = timeout.select_timeout(&mut tracker, &[1u32], 2, now);
assert_eq!(t2, Duration::from_millis(20));
let t3 = timeout.select_timeout(&mut tracker, &[1u32], 3, now);
assert_eq!(t3, Duration::from_millis(40));
}
#[test]
fn exponential_backoff_capped_at_max() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
let t = timeout.select_timeout(&mut tracker, &[1u32], 100, now);
assert_eq!(t, Duration::from_secs(60));
}
#[test]
fn adaptive_timeout_with_data() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
assert_eq!(t, Duration::from_millis(100));
}
#[test]
fn adaptive_timeout_respects_floor() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(1), now);
}
let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
assert_eq!(t, Duration::from_millis(10));
}
#[test]
fn adaptive_timeout_respects_ceiling() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50_000), now);
}
let t = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
assert_eq!(t, Duration::from_secs(60));
}
#[test]
fn max_across_destinations() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(10), now);
tracker.record_latency(&2u32, Duration::from_millis(500), now);
}
let t = timeout.select_timeout(&mut tracker, &[1u32, 2u32], 1, now);
assert!(
t >= Duration::from_millis(990) && t <= Duration::from_millis(1010),
"timeout was {t:?}"
);
}
#[test]
fn attempt_multiplier_increases_timeout() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let t1 = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
let t2 = timeout.select_timeout(&mut tracker, &[1u32], 2, now);
let t3 = timeout.select_timeout(&mut tracker, &[1u32], 3, now);
assert_eq!(t1, Duration::from_millis(100));
assert_eq!(t2, Duration::from_millis(200));
assert_eq!(t3, Duration::from_millis(400));
}
#[test]
fn mixed_data_and_no_data_destinations() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let t = timeout.select_timeout(&mut tracker, &[1u32, 2u32], 1, now);
assert_eq!(t, Duration::from_millis(100));
}
#[test]
fn ms_variants_match_duration_variants() {
let now = Instant::now();
let (mut tracker, timeout) = make_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let dur = timeout.select_timeout(&mut tracker, &[1u32], 1, now);
let ms = timeout.select_timeout_ms(&mut tracker, &[1u32], 1, now);
assert_eq!(dur, Duration::from_millis(ms));
let dur_fb = timeout.exponential_backoff(3);
let ms_fb = timeout.exponential_backoff_ms(3);
assert_eq!(dur_fb, Duration::from_millis(ms_fb));
}
#[cfg(feature = "sync")]
mod sync_tests {
use std::time::{Duration, Instant};
use crate::config::{TimeoutConfig, TrackerConfig};
use crate::parse::BackoffInterval;
use crate::sync_tracker::SyncLatencyTracker;
use crate::timeout::AdaptiveTimeout;
fn make_sync_tracker_and_timeout() -> (SyncLatencyTracker<u32>, AdaptiveTimeout) {
let tracker_config = TrackerConfig {
min_samples: 5,
..TrackerConfig::default()
};
let timeout_config = TimeoutConfig {
backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
quantile: 0.99,
safety_factor: 2.0,
};
(
SyncLatencyTracker::new(tracker_config),
AdaptiveTimeout::new(timeout_config),
)
}
#[test]
fn sync_fallback_exponential_backoff_no_data() {
let now = Instant::now();
let (tracker, timeout) = make_sync_tracker_and_timeout();
let t1 = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
assert_eq!(t1, Duration::from_millis(10));
let t2 = timeout.select_timeout_sync(&tracker, &[1u32], 2, now);
assert_eq!(t2, Duration::from_millis(20));
let t3 = timeout.select_timeout_sync(&tracker, &[1u32], 3, now);
assert_eq!(t3, Duration::from_millis(40));
}
#[test]
fn sync_adaptive_timeout_with_data() {
let now = Instant::now();
let (tracker, timeout) = make_sync_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let t = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
assert_eq!(t, Duration::from_millis(100));
}
#[test]
fn sync_respects_floor_and_ceiling() {
let now = Instant::now();
let (tracker, timeout) = make_sync_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(1), now);
}
let t = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
assert_eq!(t, Duration::from_millis(10));
for _ in 0..100 {
tracker.record_latency(&2u32, Duration::from_millis(50_000), now);
}
let t = timeout.select_timeout_sync(&tracker, &[2u32], 1, now);
assert_eq!(t, Duration::from_secs(60));
}
#[test]
fn sync_max_across_destinations() {
let now = Instant::now();
let (tracker, timeout) = make_sync_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(10), now);
tracker.record_latency(&2u32, Duration::from_millis(500), now);
}
let t = timeout.select_timeout_sync(&tracker, &[1u32, 2u32], 1, now);
assert!(
t >= Duration::from_millis(990) && t <= Duration::from_millis(1010),
"timeout was {t:?}"
);
}
#[test]
fn sync_ms_variants_match_duration_variants() {
let now = Instant::now();
let (tracker, timeout) = make_sync_tracker_and_timeout();
for _ in 0..100 {
tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let dur = timeout.select_timeout_sync(&tracker, &[1u32], 1, now);
let ms = timeout.select_timeout_sync_ms(&tracker, &[1u32], 1, now);
assert_eq!(dur, Duration::from_millis(ms));
}
#[test]
fn sync_matches_mutable_tracker_results() {
use crate::tracker::LatencyTracker;
let now = Instant::now();
let tracker_config = TrackerConfig {
min_samples: 5,
..TrackerConfig::default()
};
let timeout_config = TimeoutConfig {
backoff: "10ms..60s".parse::<BackoffInterval>().unwrap(),
quantile: 0.99,
safety_factor: 2.0,
};
let mut mutable_tracker = LatencyTracker::<u32, Instant>::new(tracker_config);
let sync_tracker = SyncLatencyTracker::<u32>::new(tracker_config);
let timeout = AdaptiveTimeout::new(timeout_config);
for _ in 0..100 {
mutable_tracker.record_latency(&1u32, Duration::from_millis(50), now);
sync_tracker.record_latency(&1u32, Duration::from_millis(50), now);
}
let ms_mut = timeout.select_timeout_ms(&mut mutable_tracker, &[1u32], 1, now);
let ms_sync = timeout.select_timeout_sync_ms(&sync_tracker, &[1u32], 1, now);
assert_eq!(ms_mut, ms_sync);
}
}
}