trillium_http/h2/connection/ping.rs
1//! PING / PING-ACK round-trip tracking.
2//!
3//! [`H2Connection::send_ping`][super::H2Connection::send_ping] returns a [`SendPing`] future
4//! that resolves with the round-trip time once the peer's `PING ACK` arrives. The driver-side
5//! hooks ([`drain_pending_ping_outbound`][super::H2Connection::drain_pending_ping_outbound],
6//! [`complete_pending_ping`][super::H2Connection::complete_pending_ping],
7//! [`fail_pending_pings`][super::H2Connection::fail_pending_pings]) are the in-driver-task
8//! counterparts: queue-drain, ack-arrival, connection-close.
9
10use super::H2Connection;
11use std::{
12 future::Future,
13 io,
14 pin::Pin,
15 task::{Context, Poll, Waker},
16 time::{Duration, Instant},
17};
18
19/// Tracks a single outstanding active PING's lifecycle.
20#[derive(Debug)]
21pub(crate) struct PendingPing {
22 pub(crate) sent_at: Instant,
23 pub(crate) waker: Option<Waker>,
24 pub(crate) completed: Option<io::Result<Duration>>,
25}
26
27/// Future returned by [`H2Connection::send_ping`].
28///
29/// Resolves to the round-trip time once the peer's PING ACK arrives, or to an `io::Error`
30/// if the connection closes first. Dropping the future before completion removes the
31/// pending entry so the [`H2Connection`]'s map doesn't accumulate stale state.
32#[must_use = "futures do nothing unless awaited"]
33#[derive(Debug)]
34pub struct SendPing<'a> {
35 pub(super) connection: &'a H2Connection,
36 pub(super) opaque: [u8; 8],
37 /// `true` while this future still owns an entry in `pending_pings` that `Drop` must
38 /// remove. Set to `false` once registration fails (duplicate opaque) or `poll` returns
39 /// `Ready` with the entry removed.
40 pub(super) needs_cleanup: bool,
41}
42
43impl Future for SendPing<'_> {
44 type Output = io::Result<Duration>;
45
46 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47 let this = self.get_mut();
48 if !this.needs_cleanup {
49 return Poll::Ready(Err(io::Error::new(
50 io::ErrorKind::AlreadyExists,
51 "PING with this opaque payload is already in flight",
52 )));
53 }
54 let mut pending = this
55 .connection
56 .pending_pings
57 .lock()
58 .expect("pending_pings mutex poisoned");
59 let entry = pending
60 .get_mut(&this.opaque)
61 .expect("pending_pings entry removed while SendPing future still pending");
62 if let Some(result) = entry.completed.take() {
63 pending.remove(&this.opaque);
64 this.needs_cleanup = false;
65 return Poll::Ready(result);
66 }
67 entry.waker = Some(cx.waker().clone());
68 Poll::Pending
69 }
70}
71
72impl Drop for SendPing<'_> {
73 fn drop(&mut self) {
74 if self.needs_cleanup
75 && let Ok(mut pending) = self.connection.pending_pings.lock()
76 {
77 pending.remove(&self.opaque);
78 }
79 }
80}
81
82impl H2Connection {
83 /// Send a `PING` frame to the peer and resolve when its `PING ACK` arrives, returning
84 /// the round-trip time.
85 ///
86 /// `opaque` is the 8-byte payload echoed back by the peer (RFC 9113 §6.7). Caller picks
87 /// the value — typically a counter or a random nonce. A `PING` whose opaque payload is
88 /// already in flight on this connection resolves to `io::ErrorKind::AlreadyExists`.
89 ///
90 /// No internal timeout. Wrap the returned future with the runtime's
91 /// `race_with_timeout` (or equivalent) to bound the wait.
92 ///
93 /// # Cancel safety
94 ///
95 /// Dropping the returned future before completion removes the pending entry from this
96 /// connection's tracking map. The PING frame may still go out (or already have gone
97 /// out) and the peer's ACK is silently dropped. Re-using the same `opaque` after drop
98 /// is safe.
99 ///
100 /// # Panics
101 ///
102 /// Panics if any of the per-connection mutexes is poisoned (a previous thread panicked
103 /// while holding the lock) — same posture as the rest of the h2 driver's mutex usage.
104 pub fn send_ping(&self, opaque: [u8; 8]) -> SendPing<'_> {
105 let mut pending = self
106 .pending_pings
107 .lock()
108 .expect("pending_pings mutex poisoned");
109 if pending.contains_key(&opaque) {
110 return SendPing {
111 connection: self,
112 opaque,
113 needs_cleanup: false,
114 };
115 }
116 pending.insert(
117 opaque,
118 PendingPing {
119 sent_at: Instant::now(),
120 waker: None,
121 completed: None,
122 },
123 );
124 drop(pending);
125 self.pending_ping_outbound
126 .lock()
127 .expect("pending_ping_outbound mutex poisoned")
128 .push_back(opaque);
129 self.outbound_waker.wake();
130 SendPing {
131 connection: self,
132 opaque,
133 needs_cleanup: true,
134 }
135 }
136
137 /// Driver-side: drain the queue of outbound active PING opaque payloads. Called from
138 /// the driver's `service_handler_signals` tick.
139 pub(in crate::h2) fn drain_pending_ping_outbound(&self) -> Vec<[u8; 8]> {
140 let mut queue = self
141 .pending_ping_outbound
142 .lock()
143 .expect("pending_ping_outbound mutex poisoned");
144 queue.drain(..).collect()
145 }
146
147 /// Driver-side: a `PING ACK` for the given opaque payload arrived. Marks the pending
148 /// entry complete with the elapsed RTT and wakes its waker, if any. A no-op if the
149 /// payload doesn't match an outstanding PING (unsolicited ACK).
150 pub(in crate::h2) fn complete_pending_ping(&self, opaque: [u8; 8]) {
151 let mut pending = self
152 .pending_pings
153 .lock()
154 .expect("pending_pings mutex poisoned");
155 if let Some(entry) = pending.get_mut(&opaque) {
156 let elapsed = entry.sent_at.elapsed();
157 entry.completed = Some(Ok(elapsed));
158 if let Some(waker) = entry.waker.take() {
159 waker.wake();
160 }
161 }
162 }
163
164 /// Driver-side: connection is closing. Complete every outstanding PING with the given
165 /// error so awaiting `send_ping` futures don't block forever.
166 pub(in crate::h2) fn fail_pending_pings(
167 &self,
168 error_kind: io::ErrorKind,
169 message: &'static str,
170 ) {
171 let mut pending = self
172 .pending_pings
173 .lock()
174 .expect("pending_pings mutex poisoned");
175 for entry in pending.values_mut() {
176 if entry.completed.is_none() {
177 entry.completed = Some(Err(io::Error::new(error_kind, message)));
178 if let Some(waker) = entry.waker.take() {
179 waker.wake();
180 }
181 }
182 }
183 }
184}