1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use n0_future::time::{self, Duration, Instant};
use tracing::debug;
/// Maximum time for a ping response in the relay protocol.
pub const PING_TIMEOUT: Duration = Duration::from_secs(5);
/// Minimum timeout for an RTT-based health check ping.
const MIN_HEALTH_CHECK_TIMEOUT: Duration = Duration::from_millis(500);
/// Tracks pings on a single relay connection.
///
/// Only the last ping needs is useful, any previously sent ping is forgotten and ignored.
#[derive(Debug)]
pub struct PingTracker {
inner: Option<PingInner>,
max_timeout: Duration,
/// Last measured round-trip time to the relay server.
last_rtt: Option<Duration>,
}
#[derive(Debug)]
struct PingInner {
data: [u8; 8],
deadline: Instant,
sent_at: Instant,
}
impl Default for PingTracker {
fn default() -> Self {
Self::new(PING_TIMEOUT)
}
}
impl PingTracker {
/// Creates a new ping tracker with the given maximum ping timeout.
pub fn new(max_timeout: Duration) -> Self {
Self {
inner: None,
max_timeout,
last_rtt: None,
}
}
/// Returns the maximum ping timeout.
pub fn max_timeout(&self) -> Duration {
self.max_timeout
}
/// Starts a new ping with an RTT-based timeout.
pub fn new_ping(&mut self) -> [u8; 8] {
let timeout = self.ping_timeout();
self.new_ping_with_timeout(timeout)
}
/// Starts a new ping with a custom timeout.
pub fn new_ping_with_timeout(&mut self, timeout: Duration) -> [u8; 8] {
let ping_data = rand::random();
let now = Instant::now();
debug!(data = ?ping_data, "Sending ping to relay server.");
self.inner = Some(PingInner {
data: ping_data,
deadline: now + timeout,
sent_at: now,
});
ping_data
}
/// Updates the ping tracker with a received pong.
///
/// Only the pong of the most recent ping will do anything. There is no harm feeding
/// any pong however.
pub fn pong_received(&mut self, data: [u8; 8]) {
if let Some(inner) = &self.inner
&& inner.data == data
{
let rtt = inner.sent_at.elapsed();
debug!(?data, ?rtt, "Pong received from relay server");
self.last_rtt = Some(rtt);
self.inner = None;
}
}
/// Returns the timeout for the next ping.
///
/// Uses 3x the last measured RTT (to account for jitter), falling back to
/// the default timeout if no RTT has been measured yet.
pub fn ping_timeout(&self) -> Duration {
self.last_rtt
.map(|rtt| (rtt * 3).clamp(MIN_HEALTH_CHECK_TIMEOUT, self.max_timeout))
.unwrap_or(self.max_timeout)
}
/// Cancel-safe waiting for a ping timeout.
///
/// Unless the most recent sent ping times out, this will never return.
pub async fn timeout(&mut self) {
match self.inner {
Some(PingInner { deadline, data, .. }) => {
time::sleep_until(deadline).await;
debug!(?data, "Ping timeout.");
self.inner = None;
}
None => std::future::pending().await,
}
}
}