use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use dashmap::DashMap;
use crate::simulation::TimeSource;
const BASELINE_WINDOW: Duration = Duration::from_secs(300);
const RECENT_WINDOW: Duration = Duration::from_secs(10);
const MAX_SAMPLES: usize = 32_768;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct RttSnapshot {
pub baseline_min: Option<Duration>,
pub recent_median: Option<Duration>,
pub inflation: Option<Duration>,
pub baseline_samples: usize,
pub recent_samples: usize,
}
pub(crate) struct RollingRttStats<T: TimeSource> {
time_source: T,
inner: parking_lot::Mutex<Inner>,
}
struct Inner {
samples: VecDeque<(u64, u64)>,
}
impl<T: TimeSource> RollingRttStats<T> {
pub(crate) fn new(time_source: T) -> Self {
Self {
time_source,
inner: parking_lot::Mutex::new(Inner {
samples: VecDeque::with_capacity(1024),
}),
}
}
pub(crate) fn record(&self, rtt: Duration) {
let rtt_nanos = rtt.as_nanos().min(u64::MAX as u128) as u64;
let mut inner = self.inner.lock();
let now_nanos = self.time_source.now_nanos();
let cutoff = now_nanos.saturating_sub(BASELINE_WINDOW.as_nanos() as u64);
inner.samples.push_back((now_nanos, rtt_nanos));
while let Some(&(ts, _)) = inner.samples.front() {
if ts < cutoff {
inner.samples.pop_front();
} else {
break;
}
}
while inner.samples.len() > MAX_SAMPLES {
inner.samples.pop_front();
}
}
pub(crate) fn snapshot(&self) -> Option<RttSnapshot> {
let now_nanos = self.time_source.now_nanos();
let baseline_cutoff = now_nanos.saturating_sub(BASELINE_WINDOW.as_nanos() as u64);
let recent_cutoff = now_nanos.saturating_sub(RECENT_WINDOW.as_nanos() as u64);
let inner = self.inner.lock();
if inner.samples.is_empty() {
return None;
}
let mut baseline_min: Option<u64> = None;
let mut baseline_samples = 0usize;
let mut recent: Vec<u64> = Vec::new();
for &(ts, rtt) in inner.samples.iter() {
if ts < baseline_cutoff {
continue;
}
baseline_samples += 1;
baseline_min = Some(match baseline_min {
Some(m) => m.min(rtt),
None => rtt,
});
if ts >= recent_cutoff {
recent.push(rtt);
}
}
if baseline_samples == 0 {
return None;
}
let recent_samples = recent.len();
let recent_median = if recent.is_empty() {
None
} else {
recent.sort_unstable();
Some(Duration::from_nanos(recent[recent.len() / 2]))
};
let baseline_min_dur = baseline_min.map(Duration::from_nanos);
let inflation = match (baseline_min_dur, recent_median) {
(Some(b), Some(r)) => Some(r.saturating_sub(b)),
_ => None,
};
Some(RttSnapshot {
baseline_min: baseline_min_dur,
recent_median,
inflation,
baseline_samples,
recent_samples,
})
}
#[cfg(test)]
pub(crate) fn stored_samples(&self) -> usize {
self.inner.lock().samples.len()
}
}
pub(crate) trait RttSnapshotProvider: Send + Sync {
fn snapshot(&self) -> Option<RttSnapshot>;
}
impl<T: TimeSource> RttSnapshotProvider for RollingRttStats<T> {
fn snapshot(&self) -> Option<RttSnapshot> {
Self::snapshot(self)
}
}
pub(crate) static SHADOW_RTT_REGISTRY: LazyLock<DashMap<SocketAddr, Arc<dyn RttSnapshotProvider>>> =
LazyLock::new(DashMap::new);
pub(crate) struct RollingRttStatsHandle<T: TimeSource> {
stats: Arc<RollingRttStats<T>>,
erased: Arc<dyn RttSnapshotProvider>,
remote_addr: SocketAddr,
}
impl<T: TimeSource> RollingRttStatsHandle<T> {
pub(crate) fn new(remote_addr: SocketAddr, time_source: T) -> Self {
let stats = Arc::new(RollingRttStats::new(time_source));
let erased: Arc<dyn RttSnapshotProvider> = stats.clone();
SHADOW_RTT_REGISTRY.insert(remote_addr, erased.clone());
Self {
stats,
erased,
remote_addr,
}
}
pub(crate) fn record(&self, rtt: Duration) {
self.stats.record(rtt);
}
}
impl<T: TimeSource> Drop for RollingRttStatsHandle<T> {
fn drop(&mut self) {
SHADOW_RTT_REGISTRY.remove_if(&self.remote_addr, |_, current| {
Arc::ptr_eq(current, &self.erased)
});
}
}
pub(crate) fn registry_snapshot() -> Vec<(SocketAddr, RttSnapshot)> {
SHADOW_RTT_REGISTRY
.iter()
.filter_map(|kv| kv.value().snapshot().map(|s| (*kv.key(), s)))
.collect()
}
pub(crate) fn cross_connection_median_inflation() -> Option<Duration> {
let mut inflations: Vec<u64> = registry_snapshot()
.into_iter()
.filter_map(|(_, s)| s.inflation.map(|d| d.as_nanos() as u64))
.collect();
if inflations.is_empty() {
return None;
}
inflations.sort_unstable();
Some(Duration::from_nanos(inflations[inflations.len() / 2]))
}
const AGGREGATOR_INTERVAL: Duration = Duration::from_secs(1);
pub(crate) fn spawn_aggregator(
local_peer_id: String,
monitor: &crate::node::background_task_monitor::BackgroundTaskMonitor,
) {
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(AGGREGATOR_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
ticker.tick().await;
emit_aggregate_snapshot(&local_peer_id);
}
});
monitor.register("shadow_rtt_aggregator", handle);
}
fn emit_aggregate_snapshot(local_peer_id: &str) {
let snap = registry_snapshot();
let active_peers = snap.len();
if active_peers == 0 {
return;
}
let peers_with_recent = snap
.iter()
.filter(|(_, s)| s.recent_median.is_some())
.count();
let median_inflation_us = cross_connection_median_inflation().map(|d| d.as_micros() as u64);
for (addr, s) in &snap {
tracing::trace!(
target: "freenet::transport::shadow_rtt",
peer = %addr,
baseline_min_us = s.baseline_min.map(|d| d.as_micros() as u64),
recent_median_us = s.recent_median.map(|d| d.as_micros() as u64),
inflation_us = s.inflation.map(|d| d.as_micros() as u64),
baseline_samples = s.baseline_samples,
recent_samples = s.recent_samples,
"shadow_rtt_per_peer"
);
}
tracing::debug!(
target: "freenet::transport::shadow_rtt",
active_peers,
peers_with_recent,
median_inflation_us,
"shadow_rtt_aggregate"
);
crate::tracing::telemetry::send_standalone_event_with_peer_id(
"shadow_rtt_aggregate",
local_peer_id,
serde_json::json!({
"active_peers": active_peers,
"peers_with_recent": peers_with_recent,
"median_inflation_us": median_inflation_us,
}),
);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::simulation::VirtualTime;
fn dur_ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
fn new_stats() -> (VirtualTime, RollingRttStats<VirtualTime>) {
let ts = VirtualTime::new();
let stats = RollingRttStats::new(ts.clone());
(ts, stats)
}
#[test]
fn empty_snapshot_is_none() {
let (_ts, stats) = new_stats();
assert!(stats.snapshot().is_none());
}
#[test]
fn baseline_min_tracks_minimum() {
let (ts, stats) = new_stats();
for ms in [50u64, 80, 40, 60, 100] {
stats.record(dur_ms(ms));
ts.advance(Duration::from_millis(100));
}
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.baseline_min, Some(dur_ms(40)));
}
#[test]
fn samples_older_than_baseline_window_are_dropped() {
let (ts, stats) = new_stats();
stats.record(dur_ms(10));
ts.advance(BASELINE_WINDOW + Duration::from_secs(1));
for ms in [80u64, 90, 100] {
stats.record(dur_ms(ms));
ts.advance(Duration::from_millis(100));
}
let snap = stats.snapshot().expect("snapshot");
assert_eq!(
snap.baseline_min,
Some(dur_ms(80)),
"the 10 ms outlier should have aged out"
);
assert_eq!(snap.baseline_samples, 3);
}
#[test]
fn recent_median_uses_only_last_ten_seconds() {
let (ts, stats) = new_stats();
for ms in [40u64, 45, 50] {
stats.record(dur_ms(ms));
ts.advance(Duration::from_millis(100));
}
ts.advance(Duration::from_secs(60));
for ms in [120u64, 130, 140] {
stats.record(dur_ms(ms));
ts.advance(Duration::from_millis(100));
}
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.recent_median, Some(dur_ms(130)));
assert_eq!(snap.recent_samples, 3);
assert_eq!(snap.baseline_min, Some(dur_ms(40)));
}
#[test]
fn inflation_is_recent_minus_baseline() {
let (ts, stats) = new_stats();
for _ in 0..5 {
stats.record(dur_ms(50));
ts.advance(Duration::from_millis(200));
}
ts.advance(Duration::from_secs(30));
for _ in 0..5 {
stats.record(dur_ms(150));
ts.advance(Duration::from_millis(200));
}
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.baseline_min, Some(dur_ms(50)));
assert_eq!(snap.recent_median, Some(dur_ms(150)));
assert_eq!(snap.inflation, Some(dur_ms(100)));
}
#[test]
fn inflation_saturates_at_zero_when_recent_below_baseline() {
let (ts, stats) = new_stats();
for _ in 0..5 {
stats.record(dur_ms(200));
ts.advance(Duration::from_millis(200));
}
ts.advance(Duration::from_secs(30));
for _ in 0..5 {
stats.record(dur_ms(50));
ts.advance(Duration::from_millis(200));
}
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.inflation, Some(Duration::ZERO));
}
#[test]
fn recent_window_can_be_empty_while_baseline_has_samples() {
let (ts, stats) = new_stats();
for ms in [40u64, 45, 50] {
stats.record(dur_ms(ms));
ts.advance(Duration::from_millis(100));
}
ts.advance(Duration::from_secs(60));
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.recent_median, None);
assert!(snap.baseline_min.is_some());
assert_eq!(snap.inflation, None);
}
#[test]
fn memory_cap_drops_oldest_samples_first() {
let (ts, stats) = new_stats();
let total = MAX_SAMPLES + 50;
for i in 0..total {
let rtt = if i < 10 { dur_ms(10) } else { dur_ms(100) };
stats.record(rtt);
ts.advance(Duration::from_micros(100));
}
assert_eq!(stats.stored_samples(), MAX_SAMPLES);
let snap = stats.snapshot().expect("snapshot");
assert_eq!(
snap.baseline_min,
Some(dur_ms(100)),
"the early dur_ms(10) samples should have been evicted by the cap"
);
}
#[test]
fn snapshot_returns_none_if_only_pre_baseline_samples_remain() {
let (ts, stats) = new_stats();
stats.record(dur_ms(50));
ts.advance(BASELINE_WINDOW + Duration::from_secs(1));
let snap = stats.snapshot();
assert!(
snap.is_none(),
"stale-only samples should not yield a snapshot"
);
}
fn unique_addr(octet: u8, port: u16) -> SocketAddr {
use std::net::Ipv4Addr;
SocketAddr::new(Ipv4Addr::new(192, 0, 2, octet).into(), port)
}
#[test]
fn handle_registers_and_deregisters() {
let addr = unique_addr(1, 50001);
let ts = VirtualTime::new();
let handle = RollingRttStatsHandle::new(addr, ts.clone());
assert!(SHADOW_RTT_REGISTRY.contains_key(&addr));
let pre = registry_snapshot();
assert!(pre.iter().all(|(a, _)| *a != addr));
handle.record(dur_ms(50));
ts.advance(Duration::from_millis(200));
handle.record(dur_ms(60));
let mid = registry_snapshot();
assert!(mid.iter().any(|(a, _)| *a == addr));
drop(handle);
assert!(!SHADOW_RTT_REGISTRY.contains_key(&addr));
}
#[test]
fn drop_does_not_remove_replacement_entry() {
let addr = unique_addr(2, 50002);
let ts = VirtualTime::new();
let h1 = RollingRttStatsHandle::new(addr, ts.clone());
let h2 = RollingRttStatsHandle::new(addr, ts.clone());
assert!(SHADOW_RTT_REGISTRY.contains_key(&addr));
drop(h1);
assert!(
SHADOW_RTT_REGISTRY.contains_key(&addr),
"h1's drop must not evict h2"
);
drop(h2);
assert!(!SHADOW_RTT_REGISTRY.contains_key(&addr));
}
#[test]
fn cross_connection_median_is_robust_to_one_outlier() {
let ts = VirtualTime::new();
let addrs: Vec<SocketAddr> = (10..14u8)
.map(|i| unique_addr(i, 50100 + i as u16))
.collect();
let peers: Vec<_> = addrs
.iter()
.map(|a| RollingRttStatsHandle::new(*a, ts.clone()))
.collect();
for h in &peers {
for _ in 0..3 {
h.record(dur_ms(50));
}
}
ts.advance(Duration::from_secs(30));
for (i, h) in peers.iter().enumerate() {
let recent = if i == 3 { dur_ms(500) } else { dur_ms(55) };
for _ in 0..5 {
h.record(recent);
}
}
let mut my_inflations: Vec<u64> = registry_snapshot()
.into_iter()
.filter(|(a, _)| addrs.contains(a))
.filter_map(|(_, s)| s.inflation.map(|d| d.as_nanos() as u64))
.collect();
my_inflations.sort_unstable();
assert_eq!(
my_inflations.len(),
4,
"all four peers should have inflation"
);
let median = Duration::from_nanos(my_inflations[my_inflations.len() / 2]);
assert!(
median < dur_ms(50),
"median {median:?} should be near baseline, not pulled by the 500ms outlier"
);
drop(peers);
}
#[test]
fn cross_connection_median_returns_some_when_a_peer_has_inflation() {
let ts = VirtualTime::new();
let addr = unique_addr(30, 50300);
let h = RollingRttStatsHandle::new(addr, ts.clone());
for _ in 0..3 {
h.record(dur_ms(50));
}
ts.advance(Duration::from_secs(30));
for _ in 0..5 {
h.record(dur_ms(120));
}
let median =
cross_connection_median_inflation().expect("at least one peer has defined inflation");
assert!(median > Duration::ZERO);
drop(h);
}
#[test]
fn baseline_boundary_is_strict_less_than() {
let (ts, stats) = new_stats();
stats.record(dur_ms(50));
ts.advance(BASELINE_WINDOW);
stats.record(dur_ms(80));
let snap = stats.snapshot().expect("snapshot");
assert_eq!(
snap.baseline_samples, 2,
"sample at exact window edge survives"
);
ts.advance(Duration::from_nanos(1));
stats.record(dur_ms(90));
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.baseline_samples, 2, "first sample now aged out");
assert_eq!(snap.baseline_min, Some(dur_ms(80)));
}
#[test]
fn recent_median_picks_upper_middle_for_even_length() {
let (ts, stats) = new_stats();
for ms in [40u64, 50, 60, 70] {
stats.record(dur_ms(ms));
ts.advance(Duration::from_millis(100));
}
let snap = stats.snapshot().expect("snapshot");
assert_eq!(snap.recent_median, Some(dur_ms(60)));
}
#[tokio::test(start_paused = true)]
async fn aggregator_emits_periodically() {
use crate::node::background_task_monitor::BackgroundTaskMonitor;
let monitor = BackgroundTaskMonitor::new();
spawn_aggregator("test-peer".to_string(), &monitor);
let ts = VirtualTime::new();
let addr = unique_addr(40, 50400);
let h = RollingRttStatsHandle::new(addr, ts.clone());
h.record(dur_ms(50));
tokio::time::advance(AGGREGATOR_INTERVAL * 3 + Duration::from_millis(100)).await;
tokio::task::yield_now().await;
let exit = monitor.wait_for_any_exit();
tokio::pin!(exit);
let still_running = tokio::time::timeout(Duration::from_millis(50), &mut exit)
.await
.is_err();
assert!(
still_running,
"aggregator task should still be alive after a few ticks"
);
drop(h);
}
#[test]
fn shadow_rtt_aggregate_logs_at_debug_pin_test() {
let src = include_str!("rolling_rtt_stats.rs");
let needle = "\"shadow_rtt_aggregate\"";
let idx = src
.find(needle)
.expect("shadow_rtt_aggregate log message must still exist in source");
let preceding = &src[..idx];
let macro_idx = preceding
.rfind("tracing::")
.expect("a tracing macro must precede the shadow_rtt_aggregate log site");
let line_start = preceding[..macro_idx].rfind('\n').map_or(0, |n| n + 1);
let line_prefix = &preceding[line_start..macro_idx];
assert!(
line_prefix.chars().all(char::is_whitespace),
"rfind matched `tracing::` inside a string literal or comment, \
not a macro invocation. Prefix on its line: {line_prefix:?}"
);
let after_macro = &preceding[macro_idx + "tracing::".len()..];
let macro_name = after_macro.split('!').next().unwrap_or("");
let tail = &preceding[preceding.len().saturating_sub(200)..];
assert_eq!(
macro_name, "debug",
"shadow_rtt_aggregate local-log mirror must be at DEBUG \
(closest preceding macro is `tracing::{macro_name}!`). \
Re-promotion to INFO/WARN restores the #4251 / #4272 1Hz-heartbeat regression. \
(The OTLP send_standalone_event call below is unaffected by the log level.)\n\
Preceding source (last 200 bytes):\n{tail}"
);
}
}