use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;
use tokio::time::Instant;
use crate::config::GlobalRng;
use crate::simulation::RealTime;
use crate::transport::DefaultSocket;
use crate::transport::rolling_rtt_stats::RollingRttStats;
pub(crate) const DEFAULT_REFERENCE_TARGET: SocketAddr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 53);
const PROBE_INTERVAL: Duration = Duration::from_secs(1);
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(1);
const QUERY_NAME: &str = "freenet.org";
pub(crate) fn spawn_reference_ping(
local_peer_id: String,
target: SocketAddr,
monitor: &crate::node::background_task_monitor::BackgroundTaskMonitor,
) {
let handle = tokio::spawn(async move {
let bind_addr: SocketAddr = match target {
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
};
let socket = match DefaultSocket::bind(bind_addr).await {
Ok(s) => s,
Err(e) => {
tracing::warn!(
target: "freenet::transport::reference_ping",
error = %e,
%target,
"reference-ping disabled: ephemeral UDP socket bind failed"
);
std::future::pending::<()>().await;
return;
}
};
run_probe_loop(local_peer_id, target, socket).await;
});
monitor.register("reference_ping", handle);
}
async fn run_probe_loop(local_peer_id: String, target: SocketAddr, socket: DefaultSocket) {
let stats = RollingRttStats::new(RealTime::new());
let mut ticker = tokio::time::interval(PROBE_INTERVAL);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
ticker.tick().await;
let outcome = probe_once(&socket, target).await;
match outcome {
ProbeOutcome::Sample(rtt) => stats.record(rtt),
ProbeOutcome::NoResponse | ProbeOutcome::SendError => {
}
}
emit_snapshot(&local_peer_id, target, &stats);
}
}
#[derive(Debug, PartialEq, Eq)]
enum ProbeOutcome {
Sample(Duration),
NoResponse,
SendError,
}
async fn probe_once(socket: &DefaultSocket, target: SocketAddr) -> ProbeOutcome {
let tx_id = (GlobalRng::random_u32() & 0xFFFF) as u16;
let query = build_dns_query(tx_id, QUERY_NAME);
let sent_at = Instant::now();
if socket.send_to(&query, target).await.is_err() {
return ProbeOutcome::SendError;
}
let mut buf = [0u8; 512];
let recv_result = tokio::time::timeout(RESPONSE_TIMEOUT, async {
loop {
let (n, from) = socket.recv_from(&mut buf).await.ok()?;
if from == target && is_matching_dns_response(&buf[..n], tx_id) {
return Some(sent_at.elapsed());
}
}
})
.await;
match recv_result {
Ok(Some(rtt)) => ProbeOutcome::Sample(rtt),
Ok(None) | Err(_) => ProbeOutcome::NoResponse,
}
}
fn emit_snapshot(local_peer_id: &str, target: SocketAddr, stats: &RollingRttStats<RealTime>) {
let snapshot = stats.snapshot();
let (baseline_min_us, recent_median_us, inflation_us, baseline_samples, recent_samples) =
match snapshot {
Some(s) => (
s.baseline_min.map(|d| d.as_micros() as u64),
s.recent_median.map(|d| d.as_micros() as u64),
s.inflation.map(|d| d.as_micros() as u64),
s.baseline_samples as u64,
s.recent_samples as u64,
),
None => (None, None, None, 0, 0),
};
tracing::debug!(
target: "freenet::transport::reference_ping",
%target,
baseline_min_us,
recent_median_us,
inflation_us,
baseline_samples,
recent_samples,
"shadow_reference_ping"
);
crate::tracing::telemetry::send_standalone_event_with_peer_id(
"shadow_reference_ping",
local_peer_id,
serde_json::json!({
"target": target.to_string(),
"baseline_min_us": baseline_min_us,
"recent_median_us": recent_median_us,
"inflation_us": inflation_us,
"baseline_samples": baseline_samples,
"recent_samples": recent_samples,
}),
);
}
fn build_dns_query(tx_id: u16, name: &str) -> Vec<u8> {
let mut buf = Vec::with_capacity(40);
buf.extend_from_slice(&tx_id.to_be_bytes());
buf.extend_from_slice(&0x0100u16.to_be_bytes()); buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&0u16.to_be_bytes()); buf.extend_from_slice(&0u16.to_be_bytes()); buf.extend_from_slice(&0u16.to_be_bytes()); for label in name.split('.') {
let bytes = label.as_bytes();
let len = bytes.len().min(63);
buf.push(len as u8);
buf.extend_from_slice(&bytes[..len]);
}
buf.push(0); buf.extend_from_slice(&1u16.to_be_bytes()); buf.extend_from_slice(&1u16.to_be_bytes()); buf
}
fn is_matching_dns_response(buf: &[u8], expected_id: u16) -> bool {
if buf.len() < 12 {
return false;
}
let got_id = u16::from_be_bytes([buf[0], buf[1]]);
if got_id != expected_id {
return false;
}
buf[2] & 0x80 != 0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dns_query_header_is_well_formed() {
let q = build_dns_query(0xABCD, "freenet.org");
assert_eq!(&q[0..2], &[0xAB, 0xCD]);
assert_eq!(&q[2..4], &[0x01, 0x00]); assert_eq!(&q[4..6], &[0x00, 0x01]); assert_eq!(&q[6..8], &[0x00, 0x00]);
assert_eq!(&q[8..10], &[0x00, 0x00]);
assert_eq!(&q[10..12], &[0x00, 0x00]);
}
#[test]
fn dns_query_encodes_qname() {
let q = build_dns_query(0, "freenet.org");
let qsection = &q[12..];
assert_eq!(qsection[0], 7);
assert_eq!(&qsection[1..8], b"freenet");
assert_eq!(qsection[8], 3);
assert_eq!(&qsection[9..12], b"org");
assert_eq!(qsection[12], 0);
assert_eq!(&qsection[13..15], &[0x00, 0x01]);
assert_eq!(&qsection[15..17], &[0x00, 0x01]);
}
#[test]
fn response_validation_accepts_matching_id_with_qr_bit() {
let mut resp = vec![0u8; 12];
resp[0] = 0xCA;
resp[1] = 0xFE;
resp[2] = 0x81; resp[3] = 0x80;
assert!(is_matching_dns_response(&resp, 0xCAFE));
}
#[test]
fn response_validation_rejects_mismatched_id() {
let mut resp = vec![0u8; 12];
resp[0] = 0xCA;
resp[1] = 0xFE;
resp[2] = 0x80;
assert!(!is_matching_dns_response(&resp, 0xDEAD));
}
#[test]
fn response_validation_rejects_qr_zero() {
let mut resp = vec![0u8; 12];
resp[0] = 0xCA;
resp[1] = 0xFE;
resp[2] = 0x00; assert!(!is_matching_dns_response(&resp, 0xCAFE));
}
#[test]
fn response_validation_rejects_short_buffer() {
assert!(!is_matching_dns_response(&[], 0));
assert!(!is_matching_dns_response(&[0u8; 11], 0));
assert!(!is_matching_dns_response(&[0u8; 12], 0));
}
#[tokio::test]
async fn probe_loopback_round_trip_records_sample() {
let server = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let server_addr = server.local_addr().unwrap();
let client = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let echo = tokio::spawn(async move {
let mut buf = [0u8; 512];
let (n, from) = server.recv_from(&mut buf).await.unwrap();
let mut resp = vec![0u8; 12];
resp[0] = buf[0];
resp[1] = buf[1];
resp[2] = 0x80; server.send_to(&resp, from).await.unwrap();
n
});
let outcome = probe_once(&client, server_addr).await;
echo.await.unwrap();
match outcome {
ProbeOutcome::Sample(rtt) => {
assert!(
rtt < Duration::from_secs(1),
"localhost round trip must be well under timeout, got {rtt:?}"
);
}
ProbeOutcome::NoResponse | ProbeOutcome::SendError => {
panic!("expected Sample, got {outcome:?}")
}
}
}
#[tokio::test]
async fn probe_timeout_when_no_response() {
let silent = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let silent_addr = silent.local_addr().unwrap();
let client = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let start = Instant::now();
let outcome = probe_once(&client, silent_addr).await;
let elapsed = start.elapsed();
assert_eq!(outcome, ProbeOutcome::NoResponse);
assert!(
elapsed < RESPONSE_TIMEOUT + Duration::from_secs(1),
"probe_once must respect RESPONSE_TIMEOUT, elapsed {elapsed:?}"
);
drop(silent);
}
#[tokio::test]
async fn probe_ignores_response_from_unrelated_sender() {
let target = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let target_addr = target.local_addr().unwrap();
let interloper = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let client = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let client_addr = client.local_addr().unwrap();
let bogus_response = {
let mut resp = vec![0u8; 12];
resp[2] = 0x80;
resp
};
interloper
.send_to(&bogus_response, client_addr)
.await
.unwrap();
let start = Instant::now();
let outcome = probe_once(&client, target_addr).await;
drop(target);
drop(interloper);
assert_eq!(outcome, ProbeOutcome::NoResponse);
assert!(
start.elapsed() >= RESPONSE_TIMEOUT / 2,
"probe must keep waiting after rejecting an unrelated sender, \
elapsed {:?}",
start.elapsed()
);
}
#[tokio::test(start_paused = true)]
async fn spawn_survives_repeated_timeouts() {
use crate::node::background_task_monitor::BackgroundTaskMonitor;
let silent = DefaultSocket::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap())
.await
.unwrap();
let silent_addr = silent.local_addr().unwrap();
let monitor = BackgroundTaskMonitor::new();
spawn_reference_ping("test-peer".to_string(), silent_addr, &monitor);
tokio::time::advance(Duration::from_secs(5)).await;
tokio::task::yield_now().await;
let exit = monitor.wait_for_any_exit();
tokio::pin!(exit);
let still_running = tokio::time::timeout(Duration::from_millis(50), &mut exit)
.await
.is_err();
assert!(
still_running,
"reference_ping task must still be alive after repeated timeouts"
);
drop(silent);
}
#[tokio::test(start_paused = true)]
async fn spawn_with_unbindable_target_does_not_exit() {
use crate::node::background_task_monitor::BackgroundTaskMonitor;
let monitor = BackgroundTaskMonitor::new();
let handle = tokio::spawn(async move {
std::future::pending::<()>().await;
});
monitor.register("reference_ping_bind_fail_sim", handle);
tokio::time::advance(Duration::from_secs(10)).await;
tokio::task::yield_now().await;
let exit = monitor.wait_for_any_exit();
tokio::pin!(exit);
let still_running = tokio::time::timeout(Duration::from_millis(50), &mut exit)
.await
.is_err();
assert!(
still_running,
"bind-error path must hand a non-completing future to the monitor; \
clean Ok() return would trip wait_for_any_exit and crash the node"
);
}
}