use {
super::PeerEntry,
crate::network::PeerId,
core::{ops::Deref, time::Duration},
dashmap::DashMap,
iroh::endpoint::Connection,
std::time::Instant,
};
struct RttState {
smoothed: Duration,
variance: Duration,
sample_count: u64,
last_sample: Instant,
}
pub type Rtt = Duration;
pub struct RttTracker {
state: DashMap<PeerId, RttState>,
}
impl core::fmt::Debug for RttTracker {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("RttTracker")
.field("peers_tracked", &self.state.len())
.finish()
}
}
impl RttTracker {
pub fn new() -> Self {
Self {
state: DashMap::new(),
}
}
pub fn record_sample(&self, peer_id: PeerId, rtt: Duration) {
use dashmap::Entry;
match self.state.entry(peer_id) {
Entry::Occupied(mut entry) => {
let state = entry.get_mut();
let diff = state.smoothed.abs_diff(rtt);
state.variance =
mul_duration(state.variance, 3, 4) + mul_duration(diff, 1, 4);
state.smoothed =
mul_duration(state.smoothed, 7, 8) + mul_duration(rtt, 1, 8);
state.sample_count += 1;
state.last_sample = Instant::now();
}
Entry::Vacant(entry) => {
entry.insert(RttState {
smoothed: rtt,
variance: rtt / 2,
sample_count: 1,
last_sample: Instant::now(),
});
}
}
}
pub fn get(&self, peer_id: &PeerId) -> Option<Rtt> {
self.state.get(peer_id).map(|entry| entry.value().smoothed)
}
pub fn remove(&self, peer_id: &PeerId) {
self.state.remove(peer_id);
}
}
pub fn best_rtt(connection: &Connection) -> Option<Duration> {
connection
.paths()
.into_iter()
.find(|p| p.is_selected())
.and_then(|p| p.rtt())
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
entry: PeerEntry,
rtt: Option<Rtt>,
}
impl PeerInfo {
pub(crate) fn from_tracker(entry: &PeerEntry, tracker: &RttTracker) -> Self {
let rtt = tracker.get(entry.id());
Self {
entry: entry.clone(),
rtt,
}
}
pub const fn rtt(&self) -> Option<Rtt> {
self.rtt
}
pub fn rtt_below(&self, threshold: Duration) -> bool {
self.rtt().is_none_or(|rtt| rtt < threshold)
}
pub fn into_entry(self) -> PeerEntry {
self.entry
}
}
impl Deref for PeerInfo {
type Target = PeerEntry;
fn deref(&self) -> &PeerEntry {
&self.entry
}
}
impl From<PeerInfo> for PeerEntry {
fn from(info: PeerInfo) -> Self {
info.entry
}
}
impl AsRef<PeerEntry> for PeerInfo {
fn as_ref(&self) -> &PeerEntry {
&self.entry
}
}
fn mul_duration(d: Duration, num: u32, den: u32) -> Duration {
d * num / den
}
#[cfg(test)]
mod tests {
use super::*;
fn test_peer_id() -> PeerId {
let secret = iroh::SecretKey::generate(&mut rand::rng());
secret.public()
}
#[test]
fn first_sample_initializes_state() {
let tracker = RttTracker::new();
let peer = test_peer_id();
let rtt = Duration::from_millis(100);
tracker.record_sample(peer, rtt);
let rtt = tracker.get(&peer).expect("should have RTT data");
assert_eq!(rtt, Duration::from_millis(100));
}
#[test]
fn ewma_converges_toward_samples() {
let tracker = RttTracker::new();
let peer = test_peer_id();
tracker.record_sample(peer, Duration::from_millis(100));
for _ in 0..50 {
tracker.record_sample(peer, Duration::from_millis(200));
}
let rtt = tracker.get(&peer).unwrap();
assert!(
rtt > Duration::from_millis(195),
"smoothed RTT should converge toward 200ms, got {rtt:?}",
);
}
#[test]
fn remove_clears_state() {
let tracker = RttTracker::new();
let peer = test_peer_id();
tracker.record_sample(peer, Duration::from_millis(100));
assert!(tracker.get(&peer).is_some());
tracker.remove(&peer);
assert!(tracker.get(&peer).is_none());
}
#[test]
fn peer_info_rtt_below_optimistic_for_none() {
let tracker = RttTracker::new();
let network_id = crate::NetworkId::random();
let secret = iroh::SecretKey::generate(&mut rand::rng());
let address = iroh::EndpointAddr::from(secret.public());
let entry = PeerEntry::new(network_id, address);
let info = PeerInfo::from_tracker(&entry, &tracker);
assert!(info.rtt().is_none());
assert!(info.rtt_below(Duration::from_millis(1)));
}
#[test]
fn peer_info_rtt_below_checks_threshold() {
let tracker = RttTracker::new();
let network_id = crate::NetworkId::random();
let secret = iroh::SecretKey::generate(&mut rand::rng());
let address = iroh::EndpointAddr::from(secret.public());
let entry = PeerEntry::new(network_id, address);
tracker.record_sample(*entry.id(), Duration::from_millis(150));
let info = PeerInfo::from_tracker(&entry, &tracker);
assert!(info.rtt_below(Duration::from_millis(200)));
assert!(!info.rtt_below(Duration::from_millis(100)));
}
#[test]
fn peer_info_derefs_to_peer_entry() {
let tracker = RttTracker::new();
let network_id = crate::NetworkId::random();
let secret = iroh::SecretKey::generate(&mut rand::rng());
let address = iroh::EndpointAddr::from(secret.public());
let entry = PeerEntry::new(network_id, address).add_tags("test-tag");
let info = PeerInfo::from_tracker(&entry, &tracker);
assert_eq!(info.tags(), entry.tags());
assert_eq!(info.id(), entry.id());
assert_eq!(info.network_id(), entry.network_id());
}
}