trillium_http/h2/connection.rs
1//! Shared per-connection HTTP/2 state ([`H2Connection`]).
2//!
3//! [`H2Connection`] is `Arc`-shared between the driver task ([`H2Driver`]) and every conn
4//! task that holds an open stream's [`Conn`]. It owns the per-stream `StreamState` map,
5//! the cross-task wake primitive ([`AtomicWaker`]), and the [`HttpContext`] / [`Swansong`]
6//! the broader server stack reaches in through.
7//!
8//! The driver loop itself lives in [`super::acceptor`] — see that module for the
9//! per-connection state machine and how send / receive concerns are split.
10//!
11//! # Module layout
12//!
13//! Conn-task-side primitives are split across child modules so each subsystem reads
14//! independently:
15//!
16//! - [`ping`]: `PING` / `PING ACK` round-trip tracking and the [`SendPing`] future.
17//! - [`peer_settings_wait`]: the [`PeerSettings`] sync primitive that parks until the peer's first
18//! SETTINGS frame is applied.
19//! - [`submit`]: send-staging API ([`submit_send`][H2Connection::submit_send],
20//! [`submit_upgrade`][H2Connection::submit_upgrade]) and client-side stream-open primitives
21//! ([`open_stream`][H2Connection::open_stream] /
22//! [`open_connect_stream`][H2Connection::open_connect_stream]) + the [`SubmitSend`] future.
23//! - [`response`]: client-role recv-side primitives — [`ResponseHeaders`] and
24//! [`take_trailers`][H2Connection::take_trailers].
25//!
26//! [`H2Driver`]: super::H2Driver
27
28mod peer_settings_wait;
29mod ping;
30mod response;
31mod submit;
32
33#[cfg(feature = "unstable")]
34use super::H2Initiator;
35use super::{H2Driver, H2Settings, role::Role, transport::StreamState};
36use crate::{Conn, HttpContext};
37use atomic_waker::AtomicWaker;
38use event_listener::Event;
39use futures_lite::io::{AsyncRead, AsyncWrite};
40use ping::PendingPing;
41use std::{
42 collections::{HashMap, VecDeque},
43 future::Future,
44 io,
45 sync::{
46 Arc, Mutex, MutexGuard,
47 atomic::{AtomicBool, Ordering},
48 },
49};
50use swansong::{ShutdownCompletion, Swansong};
51#[cfg(feature = "unstable")]
52#[allow(unused_imports)]
53// re-exports for h2.rs's `pub use connection::{ResponseHeaders, SubmitSend}`
54pub use {response::ResponseHeaders, submit::SubmitSend};
55
56/// Shared per-connection state for HTTP/2.
57///
58/// Wrapped in an [`Arc`] and held by both the [`H2Driver`] driver and every conn task
59/// that holds an open stream's [`Conn`]. Per-stream `StreamState`, HPACK encoder state, and
60/// connection-level send flow control lives here.
61#[derive(Debug)]
62pub struct H2Connection {
63 pub(super) context: Arc<HttpContext>,
64 pub(super) swansong: Swansong,
65 /// Driver-side waker that conn tasks fire whenever they produce work the driver should
66 /// act on — the is-reading signal on first `H2Transport::poll_read`, and the
67 /// `submit_send` arrival. Single-consumer (the driver); N producers (conn tasks). The
68 /// driver registers its current `drive` waker here each iteration it parks.
69 pub(super) outbound_waker: AtomicWaker,
70 /// Per-stream shared state, keyed by stream id. The driver inserts on stream open and
71 /// removes on close. Conn-task-side code (`ReceivedBody`, `Conn::send_h2`) looks up
72 /// via private accessor methods on `H2Connection` rather than touching the map
73 /// directly — `StreamState` stays module-private. The driver also caches each
74 /// `Arc<StreamState>` in its private `StreamEntry` for hot-loop perf, so every entry
75 /// here has refcount ≥ 2 while the stream is open.
76 pub(super) streams: Mutex<HashMap<u32, Arc<StreamState>>>,
77 /// The peer's most recently announced SETTINGS values. Written by the driver each time a
78 /// SETTINGS frame arrives (or, for the initial SETTINGS, the first one); read from the
79 /// driver's send path when it needs to respect peer-advertised limits (HEADERS fragment
80 /// size, stream send-window seed, `MAX_HEADER_LIST_SIZE` cap). Single-task access (only
81 /// the driver touches this), so a plain `Mutex` suffices — the `RwLock` optimisation for
82 /// concurrent shared reads would be wasted here. `H2Settings` is `Copy`, so readers
83 /// typically take the guard, copy out, and release.
84 ///
85 /// Default-constructed (all fields `None`) means "peer has not yet sent SETTINGS";
86 /// readers should use [`H2Settings::effective_*`][H2Settings::effective_max_frame_size]
87 /// helpers that apply the RFC 9113 §6.5.2 defaults to absent fields.
88 pub(super) peer_settings: Mutex<H2Settings>,
89 /// Latch flipped to `true` the first (and every subsequent) time the driver applies a
90 /// peer SETTINGS frame. Distinct from `peer_settings` because an absent field in
91 /// `H2Settings` is ambiguous between "peer hasn't sent SETTINGS yet" and "peer sent
92 /// SETTINGS without that field" — the latch disambiguates. Read by [`PeerSettings`] to
93 /// gate operations that require seeing the peer's first SETTINGS (RFC 8441 §3 extended
94 /// CONNECT, in particular).
95 pub(super) peer_settings_received: AtomicBool,
96 /// Multi-listener wake source for [`PeerSettings`]. The driver fires `notify(usize::MAX)`
97 /// after applying peer SETTINGS and again on connection close, so any number of
98 /// concurrently-parked `PeerSettings` futures all unblock together. Using
99 /// [`Event`][event_listener::Event] (rather than a single [`AtomicWaker`]) is necessary
100 /// because multiple application tasks can call [`H2Connection::peer_settings`]
101 /// concurrently — e.g. a fan-out of WebSocket-over-h2 upgrades on one pooled connection
102 /// — and an `AtomicWaker`'s last-writer-wins semantics would strand all but one of them.
103 pub(super) peer_settings_event: Event,
104 /// Next stream id to allocate for client-role outbound streams. RFC 9113 §5.1.1 requires
105 /// client-initiated stream ids to be odd and strictly increasing; we start at 1 and
106 /// `+= 2` per allocation via [`AtomicU32::fetch_update`]. Read/written only by
107 /// [`Self::open_stream`]; the server role never touches it. Capped at `2^31` — once
108 /// exhausted, `fetch_update`'s closure returns `None` so the counter stops advancing
109 /// and further `open_stream` calls return `None` (the caller is expected to fail over
110 /// to a fresh connection).
111 ///
112 /// Gated behind `unstable` so server builds (which never call `open_stream`) don't
113 /// carry the field at all. Matches the existing exposure pattern for the `initiator`
114 /// module and `H2Connection::run_client`.
115 #[cfg(feature = "unstable")]
116 pub(super) next_client_stream_id: std::sync::atomic::AtomicU32,
117 /// Outstanding active PINGs we've sent and are awaiting ACKs for, keyed by opaque
118 /// payload. Populated by [`Self::send_ping`] before the PING is queued for transmission;
119 /// completed by the driver when a `PING { ack: true }` arrives whose payload matches an
120 /// entry. Drained on connection close so awaiting `send_ping` futures don't leak.
121 pub(super) pending_pings: Mutex<HashMap<[u8; 8], PendingPing>>,
122 /// Opaque payloads queued for outbound `PING { ack: false }` emission. The driver
123 /// drains this on each [`service_handler_signals`][super::H2Driver] tick. Decoupled
124 /// from `pending_pings` so registration and queuing can happen atomically from the
125 /// caller's perspective without holding two locks.
126 pub(super) pending_ping_outbound: Mutex<VecDeque<[u8; 8]>>,
127}
128
129impl H2Connection {
130 /// Construct a new `H2Connection` to manage HTTP/2 for a single peer.
131 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
132 let swansong = context.swansong().child();
133 Arc::new(Self {
134 context,
135 swansong,
136 outbound_waker: AtomicWaker::new(),
137 streams: Mutex::new(HashMap::new()),
138 peer_settings: Mutex::new(H2Settings::default()),
139 peer_settings_received: AtomicBool::new(false),
140 peer_settings_event: Event::new(),
141 #[cfg(feature = "unstable")]
142 next_client_stream_id: std::sync::atomic::AtomicU32::new(1),
143 pending_pings: Mutex::new(HashMap::new()),
144 pending_ping_outbound: Mutex::new(VecDeque::new()),
145 })
146 }
147
148 /// The [`HttpContext`] this connection was constructed with.
149 pub fn context(&self) -> Arc<HttpContext> {
150 self.context.clone()
151 }
152
153 /// The connection-scoped [`Swansong`]. Shuts down on peer GOAWAY or when the server-
154 /// level swansong shuts down.
155 pub fn swansong(&self) -> &Swansong {
156 &self.swansong
157 }
158
159 /// Attempt graceful shutdown of this HTTP/2 connection.
160 pub fn shut_down(&self) -> ShutdownCompletion {
161 self.swansong.shut_down()
162 }
163
164 /// Whether a fresh stream could be opened on this connection right now.
165 ///
166 /// Encapsulates the policy a client multiplexer asks before reusing a pooled
167 /// connection: the connection must be running (no GOAWAY received, swansong not asked
168 /// to shut down), inflight streams must be below the peer's advertised
169 /// `MAX_CONCURRENT_STREAMS`, and the client stream-id space must not be exhausted
170 /// (RFC 9113 §5.1.1 caps client-initiated stream ids at `2^31 - 1`). Future signals
171 /// (priority pressure under RFC 9218, flow-control headroom, etc.) can fold into
172 /// this without changing the call site.
173 ///
174 /// `false` doesn't mean the connection is dead — it might just be saturated and free
175 /// up momentarily. Callers should keep saturated connections in their pool rather than
176 /// evicting; pair this with a separate aliveness check to decide eviction.
177 ///
178 /// Stream-id exhaustion is the one "false" case that *is* permanent: the connection
179 /// will never accept another `open_stream` call. The caller's pool should treat this
180 /// the same as `MAX_CONCURRENT_STREAMS` saturation (Busy → fall through to a fresh
181 /// connection); the connection is still usable for in-flight stream completion.
182 ///
183 /// # Panics
184 ///
185 /// Panics if any of the per-connection mutexes is poisoned.
186 #[cfg(feature = "unstable")]
187 pub fn can_open_stream(&self) -> bool {
188 if !self.swansong.state().is_running() {
189 return false;
190 }
191 // Stream-id space exhausted: a fresh `open_stream` would return `None` because
192 // `fetch_update`'s closure refuses to advance past the cap. Without this check,
193 // an exhausted connection passes the inflight-vs-MAX_CONCURRENT_STREAMS check
194 // (no streams in flight → counts as 0) and the pool selects it as Available,
195 // only for `open_stream` to fail with a misleading "shutting down" error at the
196 // call site.
197 if self.next_client_stream_id.load(Ordering::Relaxed) >= (1u32 << 31) {
198 return false;
199 }
200 // Count wire-active streams only — entries the application is still holding after a
201 // clean wire-close (the h1/h3-symmetric "stream lives until the user drops" lifecycle)
202 // are in the map but no longer count against the peer's MAX_CONCURRENT_STREAMS per RFC
203 // 9113 §5.1's closed-state rule.
204 let inflight: u32 = self
205 .streams_lock()
206 .values()
207 .filter(|s| {
208 !(s.send.completed.load(Ordering::Acquire) && s.recv.eof.load(Ordering::Acquire))
209 })
210 .count()
211 .try_into()
212 .unwrap_or(u32::MAX);
213 let cap = self
214 .current_peer_settings()
215 .effective_max_concurrent_streams();
216 inflight < cap
217 }
218
219 /// Driver-side wake primitive. Conn-task code calls
220 /// `connection.outbound_waker().wake()` after producing work the driver should service
221 /// (an `is_reading` signal, a `submit_send` slot fill).
222 pub(super) fn outbound_waker(&self) -> &AtomicWaker {
223 &self.outbound_waker
224 }
225
226 /// Lock the per-stream `StreamState` map. Used by the driver (insert at stream open,
227 /// remove at close) and by conn-task lookups (e.g. `submit_send`).
228 pub(super) fn streams_lock(&self) -> MutexGuard<'_, HashMap<u32, Arc<StreamState>>> {
229 self.streams
230 .lock()
231 .expect("connection streams mutex poisoned")
232 }
233
234 /// Lock the peer's SETTINGS. Cheap; held only as long as the returned guard lives.
235 /// Use the `effective_*` helpers on [`H2Settings`] to get a value with RFC defaults
236 /// applied for fields the peer hasn't set; typical callers copy out via `*guard` and
237 /// release immediately.
238 pub(super) fn current_peer_settings(&self) -> MutexGuard<'_, H2Settings> {
239 self.peer_settings
240 .lock()
241 .expect("peer_settings mutex poisoned")
242 }
243
244 /// Client-role: signal that the application has dropped its [`H2Transport`] for a
245 /// cleanly wire-closed stream and the driver should now remove the entry from both
246 /// stream maps. No `RST_STREAM` is emitted — the wire side already closed cleanly via
247 /// `END_STREAM` in both directions. This is purely the application-side resource cleanup
248 /// trigger (mirroring h1/h3, where the stream lives until the user drops their handle).
249 ///
250 /// Side effects: sets `StreamState.pending_release` and wakes the driver. No-op on a
251 /// stream that's already gone from the map. Server-role streams never reach here —
252 /// they're removed eagerly when the response finishes sending.
253 pub(crate) fn release_stream(&self, stream_id: u32) {
254 let stream = self.streams_lock().get(&stream_id).cloned();
255 if let Some(stream) = stream {
256 stream.pending_release.store(true, Ordering::Release);
257 stream.needs_servicing.store(true, Ordering::Release);
258 self.outbound_waker.wake();
259 }
260 }
261
262 /// Request that the driver emit `RST_STREAM` on this stream with the given error code
263 /// and clean up. Called from the conn-task side when something in its path (e.g. a
264 /// body-read that detected a content-length violation — RFC 9113 §8.1.2.6) needs the
265 /// stream torn down but can't touch the driver's private state directly.
266 ///
267 /// Side effects: stashes the code on `StreamState.pending_reset` and wakes the driver.
268 /// A no-op if the stream is already gone from the shared map — that happens when the
269 /// driver has already closed the stream for its own reasons. Idempotent; only the first
270 /// call takes effect, subsequent calls see the slot still filled and do nothing.
271 pub(crate) fn stream_error(&self, stream_id: u32, code: super::H2ErrorCode) {
272 let Some(stream) = self.streams_lock().get(&stream_id).cloned() else {
273 return;
274 };
275 let mut slot = stream
276 .pending_reset
277 .lock()
278 .expect("pending_reset mutex poisoned");
279 if slot.is_none() {
280 *slot = Some(code);
281 drop(slot);
282 stream.needs_servicing.store(true, Ordering::Release);
283 self.outbound_waker.wake();
284 }
285 }
286
287 /// Bind this `H2Connection` to a TCP transport and return an [`H2Driver`] that drives
288 /// the connection.
289 ///
290 /// The driver must be polled to completion via repeated calls to
291 /// [`H2Driver::next`] (or its [`Stream`][futures_lite::stream::Stream] impl); each returned
292 /// [`Conn`] should be spawned on its own task.
293 pub fn run<T>(self: Arc<Self>, transport: T) -> H2Driver<T>
294 where
295 T: AsyncRead + AsyncWrite + Unpin + Send,
296 {
297 H2Driver::new(self, transport, Role::Server)
298 }
299
300 /// Bind this `H2Connection` to an outbound transport and return an [`H2Initiator`] —
301 /// the background-task future a client spawns to drive the connection.
302 ///
303 /// On first poll the driver writes the 24-byte RFC 9113 §3.4 client preface and its
304 /// initial SETTINGS; thereafter it demuxes inbound frames (peer SETTINGS, response
305 /// HEADERS / DATA on our streams, etc.) and pumps outbound bytes (new stream opens,
306 /// DATA, `WINDOW_UPDATEs`) until the connection closes or errors out.
307 ///
308 /// Awaiting the returned future resolves with `Ok(())` on graceful close or
309 /// `Err(H2Error)` on protocol / I/O failure. Streams are not opened via the future
310 /// itself — client code calls stream-open primitives on `H2Connection` (introduced
311 /// in a later phase); this future just runs the framing loop.
312 #[cfg(feature = "unstable")]
313 pub fn run_client<T>(self: Arc<Self>, transport: T) -> H2Initiator<T>
314 where
315 T: AsyncRead + AsyncWrite + Unpin + Send,
316 {
317 H2Initiator::new(H2Driver::new(self, transport, Role::Client))
318 }
319
320 /// Per-stream entry point — call from the runtime adapter's spawned task for each
321 /// [`Conn`] returned by [`H2Driver::next`]. Runs `handler` to produce the response,
322 /// then `send_h2` to hand the framed response to the driver.
323 ///
324 /// Mirrors [`H3Connection::process_inbound_bidi`][crate::h3::H3Connection::process_inbound_bidi]'s
325 /// role for h3, except the Conn is already built (the acceptor decoded HEADERS and
326 /// validated the request before emitting), so this just runs the handler chain and
327 /// sends.
328 ///
329 /// # Errors
330 ///
331 /// Returns the [`io::Error`] from `send_h2` if the body's `poll_read` errors or the
332 /// underlying transport fails partway through the response.
333 pub async fn process_inbound<Transport, Handler, Fut>(
334 conn: Conn<Transport>,
335 handler: Handler,
336 ) -> io::Result<Conn<Transport>>
337 where
338 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
339 Handler: FnOnce(Conn<Transport>) -> Fut,
340 Fut: Future<Output = Conn<Transport>>,
341 {
342 handler(conn).await.send_h2().await
343 }
344}