use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use super::config::{MIN_RTT_FILTER_WINDOW, PROBE_RTT_DURATION, PROBE_RTT_INTERVAL};
pub(crate) struct RttTracker {
min_rtt_nanos: AtomicU64,
min_rtt_stamp_nanos: AtomicU64,
last_probe_rtt_nanos: AtomicU64,
next_probe_rtt_nanos: AtomicU64,
probe_rtt_entry_nanos: AtomicU64,
probe_rtt_round_done: AtomicBool,
filter_window_nanos: u64,
probe_interval_nanos: u64,
probe_duration_nanos: u64,
}
impl RttTracker {
pub(crate) fn new() -> Self {
Self::with_params(
MIN_RTT_FILTER_WINDOW,
PROBE_RTT_INTERVAL,
PROBE_RTT_DURATION,
)
}
pub(crate) fn with_params(
filter_window: Duration,
probe_interval: Duration,
probe_duration: Duration,
) -> Self {
Self {
min_rtt_nanos: AtomicU64::new(u64::MAX),
min_rtt_stamp_nanos: AtomicU64::new(0),
last_probe_rtt_nanos: AtomicU64::new(0),
next_probe_rtt_nanos: AtomicU64::new(0),
probe_rtt_entry_nanos: AtomicU64::new(0),
probe_rtt_round_done: AtomicBool::new(false),
filter_window_nanos: filter_window.as_nanos() as u64,
probe_interval_nanos: probe_interval.as_nanos() as u64,
probe_duration_nanos: probe_duration.as_nanos() as u64,
}
}
pub(crate) fn update(&self, rtt: Duration, now_nanos: u64) -> bool {
let rtt_nanos = rtt.as_nanos() as u64;
let current_min = self.min_rtt_nanos.load(Ordering::Acquire);
let current_stamp = self.min_rtt_stamp_nanos.load(Ordering::Acquire);
let expired = now_nanos.saturating_sub(current_stamp) > self.filter_window_nanos;
if rtt_nanos < current_min || expired {
self.min_rtt_nanos.store(rtt_nanos, Ordering::Release);
self.min_rtt_stamp_nanos.store(now_nanos, Ordering::Release);
return true;
}
false
}
pub(crate) fn min_rtt(&self) -> Option<Duration> {
let nanos = self.min_rtt_nanos.load(Ordering::Acquire);
if nanos == u64::MAX {
None
} else {
Some(Duration::from_nanos(nanos))
}
}
pub(crate) fn min_rtt_nanos(&self) -> u64 {
self.min_rtt_nanos.load(Ordering::Acquire)
}
pub(crate) fn is_expired(&self, now_nanos: u64) -> bool {
let stamp = self.min_rtt_stamp_nanos.load(Ordering::Acquire);
now_nanos.saturating_sub(stamp) > self.filter_window_nanos
}
pub(crate) fn should_enter_probe_rtt(&self, now_nanos: u64) -> bool {
if self.is_expired(now_nanos) {
return true;
}
let next = self.next_probe_rtt_nanos.load(Ordering::Acquire);
next > 0 && now_nanos >= next
}
pub(crate) fn enter_probe_rtt(&self, now_nanos: u64) {
self.probe_rtt_entry_nanos
.store(now_nanos, Ordering::Release);
self.probe_rtt_round_done.store(false, Ordering::Release);
}
pub(crate) fn mark_probe_rtt_round_done(&self) {
self.probe_rtt_round_done.store(true, Ordering::Release);
}
pub(crate) fn should_exit_probe_rtt(&self, now_nanos: u64) -> bool {
let entry = self.probe_rtt_entry_nanos.load(Ordering::Acquire);
let elapsed = now_nanos.saturating_sub(entry);
let round_done = self.probe_rtt_round_done.load(Ordering::Acquire);
elapsed >= self.probe_duration_nanos && round_done
}
pub(crate) fn exit_probe_rtt(&self, now_nanos: u64, jitter_nanos: i64) {
self.last_probe_rtt_nanos
.store(now_nanos, Ordering::Release);
let next = now_nanos
.saturating_add(self.probe_interval_nanos)
.saturating_add_signed(jitter_nanos);
self.next_probe_rtt_nanos.store(next, Ordering::Release);
}
}
impl Default for RttTracker {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for RttTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let min_rtt = self.min_rtt();
f.debug_struct("RttTracker")
.field("min_rtt", &min_rtt)
.field(
"min_rtt_stamp_nanos",
&self.min_rtt_stamp_nanos.load(Ordering::Relaxed),
)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rtt_tracker_basic() {
let tracker = RttTracker::new();
assert!(tracker.min_rtt().is_none());
let updated = tracker.update(Duration::from_millis(50), 1_000_000_000);
assert!(updated);
assert_eq!(tracker.min_rtt(), Some(Duration::from_millis(50)));
}
#[test]
fn test_rtt_tracker_keeps_minimum() {
let tracker = RttTracker::new();
tracker.update(Duration::from_millis(100), 1_000_000_000);
tracker.update(Duration::from_millis(50), 2_000_000_000);
tracker.update(Duration::from_millis(75), 3_000_000_000);
assert_eq!(tracker.min_rtt(), Some(Duration::from_millis(50)));
}
#[test]
fn test_rtt_tracker_expiration() {
let tracker = RttTracker::with_params(
Duration::from_secs(5),
Duration::from_secs(5),
Duration::from_millis(200),
);
tracker.update(Duration::from_millis(50), 0);
assert!(!tracker.is_expired(4_000_000_000));
assert!(tracker.is_expired(6_000_000_000)); }
#[test]
fn test_rtt_tracker_expired_update() {
let tracker = RttTracker::with_params(
Duration::from_secs(5),
Duration::from_secs(5),
Duration::from_millis(200),
);
tracker.update(Duration::from_millis(50), 0);
let updated = tracker.update(Duration::from_millis(100), 6_000_000_000);
assert!(updated);
assert_eq!(tracker.min_rtt(), Some(Duration::from_millis(100)));
}
#[test]
fn test_probe_rtt_scheduling() {
let tracker = RttTracker::with_params(
Duration::from_secs(10),
Duration::from_secs(5),
Duration::from_millis(200),
);
assert!(!tracker.should_enter_probe_rtt(1_000_000_000));
tracker.update(Duration::from_millis(50), 0);
assert!(tracker.should_enter_probe_rtt(11_000_000_000)); }
#[test]
fn test_probe_rtt_lifecycle() {
let tracker = RttTracker::with_params(
Duration::from_secs(10),
Duration::from_secs(5),
Duration::from_millis(200),
);
tracker.enter_probe_rtt(1_000_000_000);
assert!(!tracker.should_exit_probe_rtt(1_100_000_000));
tracker.mark_probe_rtt_round_done();
assert!(!tracker.should_exit_probe_rtt(1_100_000_000));
assert!(tracker.should_exit_probe_rtt(1_250_000_000));
tracker.exit_probe_rtt(1_250_000_000, 0);
let next = tracker.next_probe_rtt_nanos.load(Ordering::Acquire);
assert_eq!(next, 1_250_000_000 + 5_000_000_000);
}
}