Skip to main content

trillium_http/h2/connection/
ping.rs

1//! PING / PING-ACK round-trip tracking.
2//!
3//! [`H2Connection::send_ping`][super::H2Connection::send_ping] returns a [`SendPing`] future
4//! that resolves with the round-trip time once the peer's `PING ACK` arrives. The driver-side
5//! hooks ([`drain_pending_ping_outbound`][super::H2Connection::drain_pending_ping_outbound],
6//! [`complete_pending_ping`][super::H2Connection::complete_pending_ping],
7//! [`fail_pending_pings`][super::H2Connection::fail_pending_pings]) are the in-driver-task
8//! counterparts: queue-drain, ack-arrival, connection-close.
9
10use super::H2Connection;
11use std::{
12    future::Future,
13    io,
14    pin::Pin,
15    task::{Context, Poll, Waker},
16    time::{Duration, Instant},
17};
18
19/// Tracks a single outstanding active PING's lifecycle.
20#[derive(Debug)]
21pub(crate) struct PendingPing {
22    pub(crate) sent_at: Instant,
23    pub(crate) waker: Option<Waker>,
24    pub(crate) completed: Option<io::Result<Duration>>,
25}
26
27/// Future returned by [`H2Connection::send_ping`].
28///
29/// Resolves to the round-trip time once the peer's PING ACK arrives, or to an `io::Error`
30/// if the connection closes first. Dropping the future before completion removes the
31/// pending entry so the [`H2Connection`]'s map doesn't accumulate stale state.
32#[must_use = "futures do nothing unless awaited"]
33#[derive(Debug)]
34pub struct SendPing<'a> {
35    pub(super) connection: &'a H2Connection,
36    pub(super) opaque: [u8; 8],
37    /// `true` while this future still owns an entry in `pending_pings` that `Drop` must
38    /// remove. Set to `false` once registration fails (duplicate opaque) or `poll` returns
39    /// `Ready` with the entry removed.
40    pub(super) needs_cleanup: bool,
41}
42
43impl Future for SendPing<'_> {
44    type Output = io::Result<Duration>;
45
46    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47        let this = self.get_mut();
48        if !this.needs_cleanup {
49            return Poll::Ready(Err(io::Error::new(
50                io::ErrorKind::AlreadyExists,
51                "PING with this opaque payload is already in flight",
52            )));
53        }
54        let mut pending = this
55            .connection
56            .pending_pings
57            .lock()
58            .expect("pending_pings mutex poisoned");
59        let entry = pending
60            .get_mut(&this.opaque)
61            .expect("pending_pings entry removed while SendPing future still pending");
62        if let Some(result) = entry.completed.take() {
63            pending.remove(&this.opaque);
64            this.needs_cleanup = false;
65            return Poll::Ready(result);
66        }
67        entry.waker = Some(cx.waker().clone());
68        Poll::Pending
69    }
70}
71
72impl Drop for SendPing<'_> {
73    fn drop(&mut self) {
74        if self.needs_cleanup
75            && let Ok(mut pending) = self.connection.pending_pings.lock()
76        {
77            pending.remove(&self.opaque);
78        }
79    }
80}
81
82impl H2Connection {
83    /// Send a `PING` frame to the peer and resolve when its `PING ACK` arrives, returning
84    /// the round-trip time.
85    ///
86    /// `opaque` is the 8-byte payload echoed back by the peer (RFC 9113 §6.7). Caller picks
87    /// the value — typically a counter or a random nonce. A `PING` whose opaque payload is
88    /// already in flight on this connection resolves to `io::ErrorKind::AlreadyExists`.
89    ///
90    /// No internal timeout. Wrap the returned future with the runtime's
91    /// `race_with_timeout` (or equivalent) to bound the wait.
92    ///
93    /// # Cancel safety
94    ///
95    /// Dropping the returned future before completion removes the pending entry from this
96    /// connection's tracking map. The PING frame may still go out (or already have gone
97    /// out) and the peer's ACK is silently dropped. Re-using the same `opaque` after drop
98    /// is safe.
99    ///
100    /// # Panics
101    ///
102    /// Panics if any of the per-connection mutexes is poisoned (a previous thread panicked
103    /// while holding the lock) — same posture as the rest of the h2 driver's mutex usage.
104    pub fn send_ping(&self, opaque: [u8; 8]) -> SendPing<'_> {
105        let mut pending = self
106            .pending_pings
107            .lock()
108            .expect("pending_pings mutex poisoned");
109        if pending.contains_key(&opaque) {
110            return SendPing {
111                connection: self,
112                opaque,
113                needs_cleanup: false,
114            };
115        }
116        pending.insert(
117            opaque,
118            PendingPing {
119                sent_at: Instant::now(),
120                waker: None,
121                completed: None,
122            },
123        );
124        drop(pending);
125        self.pending_ping_outbound
126            .lock()
127            .expect("pending_ping_outbound mutex poisoned")
128            .push_back(opaque);
129        self.outbound_waker.wake();
130        SendPing {
131            connection: self,
132            opaque,
133            needs_cleanup: true,
134        }
135    }
136
137    /// Driver-side: drain the queue of outbound active PING opaque payloads. Called from
138    /// the driver's `service_handler_signals` tick.
139    pub(in crate::h2) fn drain_pending_ping_outbound(&self) -> Vec<[u8; 8]> {
140        let mut queue = self
141            .pending_ping_outbound
142            .lock()
143            .expect("pending_ping_outbound mutex poisoned");
144        queue.drain(..).collect()
145    }
146
147    /// Driver-side: a `PING ACK` for the given opaque payload arrived. Marks the pending
148    /// entry complete with the elapsed RTT and wakes its waker, if any. A no-op if the
149    /// payload doesn't match an outstanding PING (unsolicited ACK).
150    pub(in crate::h2) fn complete_pending_ping(&self, opaque: [u8; 8]) {
151        let mut pending = self
152            .pending_pings
153            .lock()
154            .expect("pending_pings mutex poisoned");
155        if let Some(entry) = pending.get_mut(&opaque) {
156            let elapsed = entry.sent_at.elapsed();
157            entry.completed = Some(Ok(elapsed));
158            if let Some(waker) = entry.waker.take() {
159                waker.wake();
160            }
161        }
162    }
163
164    /// Driver-side: connection is closing. Complete every outstanding PING with the given
165    /// error so awaiting `send_ping` futures don't block forever.
166    pub(in crate::h2) fn fail_pending_pings(
167        &self,
168        error_kind: io::ErrorKind,
169        message: &'static str,
170    ) {
171        let mut pending = self
172            .pending_pings
173            .lock()
174            .expect("pending_pings mutex poisoned");
175        for entry in pending.values_mut() {
176            if entry.completed.is_none() {
177                entry.completed = Some(Err(io::Error::new(error_kind, message)));
178                if let Some(waker) = entry.waker.take() {
179                    waker.wake();
180                }
181            }
182        }
183    }
184}