trillium_http/h2/acceptor.rs
1//! HTTP/2 driver loop ([`H2Driver`]) — owns the per-connection TCP transport and runs the
2//! poll-based state machine that demuxes frames, dispatches stream-opens to handler tasks, and
3//! pumps responses back out.
4//!
5//! Created by [`H2Connection::run`]. The runtime adapter calls [`H2Driver::next`] in a
6//! loop (or drives via the [`Stream`] impl, which has the same semantics); each yield either
7//! returns the next opened request stream (a [`Conn`] for the runtime to spawn a handler
8//! task against) or `None` when the connection is closed.
9//!
10//! The driver is a poll-based state machine, not an async fn. A single `drive` call is the
11//! unit of forward progress: it picks up conn-task signals, advances any in-flight response
12//! sends, drains pending outbound bytes, and advances the read cursor — parking with
13//! cancel-safe partial state when no further progress can be made.
14//!
15//! # Module layout
16//!
17//! Driver impl is split across this file and child modules to keep each focused:
18//!
19//! - **`acceptor.rs`** (this file): struct definition, the [`Self::drive`] orchestration loop, I/O
20//! read primitives (`poll_fill_to`, `poll_drain_peer`), and the supporting enums
21//! ([`DriverState`], [`ReadPhase`], [`CloseOutcome`], [`Action`], [`StreamEntry`]).
22//! - **`acceptor::closed_streams`**: bounded ledger of recently-closed streams + reasons, consulted
23//! to pick the right §5.1 error category for stale peer frames.
24//! - **`acceptor::handler_signals`**: conn-task → driver work-pickup boundary. Owns the
25//! `needs_servicing` mailbox protocol — `service_handler_signals`, `pick_up_new_client_streams`,
26//! `has_pending_handler_signals`.
27//! - **`acceptor::outbound`**: outbound write/flush plumbing and `queue_*` frame helpers.
28//! - **`acceptor::recv`**: receive side — frame reader, dispatch, HEADERS+CONTINUATION
29//! accumulation, malformed-request `RST_STREAM`, DATA routing into per-stream recv rings.
30//! - **`acceptor::send`**: send pump — picks up [`SendCursor`][send::SendCursor]s from the
31//! conn-task signal pickup, frames HEADERS / DATA / trailing-HEADERS, signals completion.
32//!
33//! [`H2Connection::run`]: super::H2Connection::run
34//! [`Stream`]: futures_lite::stream::Stream
35
36mod closed_streams;
37mod constants;
38mod handler_signals;
39mod outbound;
40mod recv;
41mod send;
42mod types;
43
44use super::{
45 H2Error, H2ErrorCode, connection::H2Connection, frame::FRAME_HEADER_LEN, role::Role,
46 transport::H2Transport,
47};
48use crate::{
49 Conn,
50 headers::hpack::{HpackDecoder, HpackEncoder},
51};
52use closed_streams::{ClosedReason, ClosedStreams};
53use constants::{
54 INITIAL_CONNECTION_RECV_WINDOW, MAX_BUFFER_SIZE, MAX_DATA_CHUNK_SIZE, MAX_FLOW_CONTROL_WINDOW,
55};
56use futures_lite::io::{AsyncRead, AsyncWrite};
57use hashbrown::HashMap;
58use recv::PendingHeaders;
59use std::{
60 future::Future,
61 io,
62 pin::Pin,
63 sync::Arc,
64 task::{Context, Poll, ready},
65};
66use swansong::ShuttingDown;
67use types::{
68 AcceptorConfig, Action, CloseOutcome, DriverState, Next, ReadPhase, StreamEntry, frame_slice,
69};
70
71/// Owns the per-connection TCP transport and drives the HTTP/2 demux loop.
72///
73/// See the module docs for the high-level driver shape and how its impl is split across the
74/// `recv` and `send` child modules.
75#[derive(Debug)]
76pub struct H2Driver<T> {
77 connection: Arc<H2Connection>,
78 transport: T,
79
80 /// Role this driver runs in — see [`Role`]. Consulted at role-asymmetric branch points
81 /// (preface direction, HEADERS-on-unknown-id, HEADERS-on-known-id).
82 role: Role,
83
84 /// Overall lifecycle position of the driver.
85 state: DriverState,
86
87 /// Future that resolves when the shared `Swansong` begins shutdown. Polled each
88 /// `drive` tick while the driver is running; on resolution the driver queues a
89 /// GOAWAY and transitions to `Closing`, after which the top-of-loop guard returns
90 /// early and we never poll this again on the same acceptor.
91 shutting_down: ShuttingDown,
92
93 /// Inbound byte cursor. Accumulates bytes from the transport across `drive` calls so
94 /// a partial frame read can survive a return to `Poll::Pending`. Always contains
95 /// exactly the bytes of the current frame being accumulated (header, then payload);
96 /// reset after each complete frame is dispatched.
97 read_buf: Vec<u8>,
98 read_filled: usize,
99 read_phase: ReadPhase,
100
101 /// Outbound byte cursor. The driver encodes control frames into `write_buf` and drains
102 /// to the transport via `poll_flush_outbound`. `write_cursor` is the offset of the
103 /// first byte not yet accepted by `poll_write`. After the buffer fully drains, both
104 /// fields are reset and a flush is issued.
105 write_buf: Vec<u8>,
106 write_cursor: usize,
107 write_flush_pending: bool,
108
109 /// HPACK decoder state, shared across all header blocks on this connection.
110 hpack: HpackDecoder,
111
112 /// HPACK encoder state. The driver is the sole owner — handlers / conn tasks
113 /// no longer touch it, so this is a plain field with no synchronization.
114 hpack_encoder: HpackEncoder,
115
116 /// Per-stream state, keyed by stream id. Driver-only — handler tasks hold their own
117 /// `Arc<StreamState>` via [`H2Transport`] and don't consult this table. The entry
118 /// bundles the shared state with driver-private bookkeeping (e.g. "have we already
119 /// advertised the recv window after seeing `is_reading`?").
120 streams: HashMap<u32, StreamEntry>,
121
122 /// Highest peer-initiated stream id seen so far. Peer-initiated (client) stream ids
123 /// must be odd and strictly increasing per RFC 9113 §5.1.1.
124 last_peer_stream_id: u32,
125
126 /// Accumulator for an in-progress HEADERS block that is waiting on further CONTINUATION
127 /// frames. `None` outside a HEADERS block. §6.10 forbids any frame on any stream from
128 /// interleaving while this is `Some`.
129 pending_headers: Option<PendingHeaders>,
130
131 /// Set once the driver decides to close: graceful (peer GOAWAY / server swansong / peer
132 /// EOF) or erroring (protocol violation → GOAWAY with code, or I/O failure → no
133 /// GOAWAY). `drive` completes (returns `None` or a final `Some(Err(...))`) once
134 /// outbound drains to empty.
135 close_outcome: Option<CloseOutcome>,
136
137 /// Set after `drive` yields its terminal result. Subsequent calls return `None` without
138 /// touching the transport.
139 finished: bool,
140
141 /// Reusable scratch the send pump reads body chunks into before framing as DATA.
142 /// Sized at [`MAX_DATA_CHUNK_SIZE`] — even if the peer permits larger frames we cap our
143 /// DATA emissions here to bound per-connection memory.
144 body_scratch: Vec<u8>,
145
146 /// Connection-level send flow-control window (RFC 9113 §6.9). Tracked as [`i64`] so
147 /// mid-connection `INITIAL_WINDOW_SIZE` reductions can drive per-stream windows
148 /// temporarily negative (§6.9.2) — kept here to the connection window for symmetry
149 /// though the connection window itself is *not* affected by `SETTINGS_INITIAL_WINDOW_SIZE`.
150 /// Decremented as we emit DATA; incremented by peer `WINDOW_UPDATE(stream_id=0, inc)`.
151 /// Overflow past [`MAX_FLOW_CONTROL_WINDOW`] is a connection-level `FLOW_CONTROL_ERROR`.
152 connection_send_window: i64,
153
154 /// Connection-level recv flow-control window. Starts at the RFC 9113 §6.9.2 baseline of
155 /// 65535 octets and is raised to [`MAX_CONNECTION_RECV_WINDOW`] via an initial
156 /// `WINDOW_UPDATE(0)` right after SETTINGS — §6.9.2 forbids SETTINGS from altering it,
157 /// so WU is the only path. Decremented as peer DATA frames arrive (across all streams);
158 /// incremented as the handler-task-side consumption signal is picked up and we emit
159 /// `WINDOW_UPDATE(0, consumed)`. A negative value means the peer overran the window —
160 /// connection-level `FLOW_CONTROL_ERROR`.
161 connection_recv_window: i64,
162
163 /// Bounded ledger of recently-closed streams and why they closed. Consulted by
164 /// [`recv::H2Driver::finalize_headers`] when a HEADERS frame arrives on an id ≤
165 /// `last_peer_stream_id` that's not in the active map, to distinguish `RST_STREAM`-
166 /// closed (stream-level `STREAM_CLOSED`) from `END_STREAM`-closed or never-opened
167 /// (connection-level). See [`ClosedStreams`] for the eviction policy.
168 closed_streams: ClosedStreams,
169
170 /// Snapshot of the h2-relevant fields of [`HttpConfig`][crate::HttpConfig] taken at
171 /// acceptor construction. Copied in because `HttpConfig` is per-server but an acceptor
172 /// is per-connection — the config is effectively immutable over a connection's
173 /// lifetime, and a local copy avoids reaching through [`H2Connection::context`] on
174 /// every policy check.
175 ///
176 /// [`H2Connection::context`]: super::H2Connection::context
177 pub(super) config: AcceptorConfig,
178}
179
180impl<T> H2Driver<T>
181where
182 T: AsyncRead + AsyncWrite + Unpin + Send,
183{
184 pub(super) fn new(connection: Arc<H2Connection>, transport: T, role: Role) -> Self {
185 let shutting_down = connection.swansong().shutting_down();
186 let context = connection.context();
187 let config = AcceptorConfig::from_http_config(context.config());
188 let hpack_encoder = HpackEncoder::new(
189 context.observer.clone(),
190 context.config.dynamic_table_capacity(),
191 context.config.recent_pairs_size(),
192 );
193 Self {
194 connection,
195 transport,
196 role,
197 state: DriverState::AwaitingPreface,
198 shutting_down,
199 read_buf: vec![0u8; FRAME_HEADER_LEN],
200 read_filled: 0,
201 read_phase: ReadPhase::NeedHeader,
202 write_buf: Vec::new(),
203 write_cursor: 0,
204 write_flush_pending: false,
205 hpack: HpackDecoder::new(config.hpack_table_capacity()),
206 hpack_encoder,
207 streams: HashMap::new(),
208 last_peer_stream_id: 0,
209 pending_headers: None,
210 close_outcome: None,
211 finished: false,
212 body_scratch: vec![0u8; MAX_DATA_CHUNK_SIZE as usize],
213 connection_send_window: INITIAL_CONNECTION_RECV_WINDOW,
214 connection_recv_window: INITIAL_CONNECTION_RECV_WINDOW,
215 closed_streams: ClosedStreams::default(),
216 config,
217 }
218 }
219
220 /// The shared [`H2Connection`] this acceptor was created from.
221 pub fn connection(&self) -> &Arc<H2Connection> {
222 &self.connection
223 }
224
225 /// Drive the connection until the next request stream opens, the connection ends, or a
226 /// fatal protocol or I/O error occurs.
227 ///
228 /// Returns `Ok(Some(conn))` for each new request stream — the runtime adapter is
229 /// expected to spawn a handler task that consumes the [`Conn`]. Malformed requests
230 /// (RFC 9113 §8.1.2) are handled internally with a stream-level `RST_STREAM` and never
231 /// surfaced. Returns `Ok(None)` when the connection has been shut down cleanly (peer
232 /// GOAWAY, our own swansong shutdown, peer EOF at a frame boundary).
233 ///
234 /// # Errors
235 ///
236 /// The returned future resolves to an [`H2Error`] for any *connection-level* protocol
237 /// violation detected while decoding peer frames or for an unrecoverable transport I/O
238 /// error. A final GOAWAY is sent before a protocol error is returned (best-effort; I/O
239 /// errors skip it).
240 // Mirrors `StreamExt::next` (a `&mut self -> impl Future<Output = Option<T>>` adapter),
241 // not `Iterator::next`. The driver is also `Stream`, so callers can use either.
242 #[allow(clippy::should_implement_trait)]
243 pub fn next(&mut self) -> Next<'_, T> {
244 Next { driver: self }
245 }
246
247 /// Poll-based driver core. Shared by [`Next`]'s `Future` impl, the [`Stream`] impl on
248 /// [`H2Driver`], and [`H2Initiator`][super::H2Initiator]'s client-side Future impl.
249 ///
250 /// [`Stream`]: futures_lite::stream::Stream
251 pub(super) fn drive(
252 &mut self,
253 cx: &mut Context<'_>,
254 ) -> Poll<Option<Result<Conn<H2Transport>, H2Error>>> {
255 if self.finished {
256 return Poll::Ready(None);
257 }
258
259 for loop_number in 0..self.config.copy_loops_per_yield() {
260 log::trace!("h2 drive loop number: {loop_number}");
261 // 1. Conn-task signals. Picks up window-update intent (`is_reading`) and new
262 // `submit_send` submissions, moving them into driver-private state.
263 self.service_handler_signals();
264
265 // 2. Send pump. Turns picked-up SendCursors into HEADERS / DATA / trailing- HEADERS
266 // frame bytes in `write_buf`. Body reads that return Pending leave the cursor in
267 // place — the body's source will wake the driver task.
268 self.advance_outbound_sends(cx);
269
270 // 3. Flush any pending outbound — never re-poll reads when we still owe bytes to the
271 // peer, and never signal closure to the caller before the wire is clean.
272 match self.poll_flush_outbound(cx) {
273 Poll::Ready(Ok(())) => {}
274 Poll::Ready(Err(e)) => {
275 // Flush failure while closing: just take whatever outcome we had and
276 // shelve the fresh I/O error. While running, record and finish.
277 if self.close_outcome.is_none() {
278 self.close_outcome = Some(CloseOutcome::Io(e));
279 }
280 return Poll::Ready(self.finish_with_current_outcome());
281 }
282 Poll::Pending => return Poll::Pending,
283 }
284
285 // 4. If we were closing, outbound is now drained. For graceful (or protocol-error)
286 // shutdowns, transition to `Drained` and wait for the peer to close its write half —
287 // otherwise the peer sees our drop as a reset rather than a clean close. For
288 // I/O-error shutdowns the transport is already untrustworthy, so skip the drain.
289 if self.state == DriverState::Closing {
290 if matches!(self.close_outcome, Some(CloseOutcome::Io(_))) {
291 return Poll::Ready(self.finish_with_current_outcome());
292 }
293 self.state = DriverState::Drained;
294 }
295
296 // 5. Server-initiated shutdown check. Only relevant while we're running — once we're
297 // past the Closing/Drained transition we've already committed to a close and
298 // re-observing the swansong here would re-enter begin_close in a loop. Post-shutdown
299 // re-polls of `ShuttingDown` are harmless themselves (event_listener-backed, not
300 // single-shot) but the re-entry isn't.
301 if self.state == DriverState::Running
302 && Pin::new(&mut self.shutting_down).poll(cx).is_ready()
303 {
304 self.begin_close(CloseOutcome::Graceful);
305 continue;
306 }
307
308 // 6. State-specific step.
309 match self.state {
310 DriverState::AwaitingPreface => {
311 // Role-asymmetric: server reads the 24-byte preface off the wire; client
312 // writes it to `write_buf` (the next drain tick flushes it, then our
313 // SETTINGS, then the peer's SETTINGS arrives as the first frame in Running).
314 let poll = match self.role {
315 Role::Server => self.poll_read_preface(cx),
316 Role::Client => {
317 self.queue_client_preface();
318 Poll::Ready(Ok(()))
319 }
320 };
321 match poll {
322 Poll::Ready(Ok(())) => self.state = DriverState::NeedsServerSettings,
323 Poll::Ready(Err(e)) => {
324 self.close_outcome = Some(e);
325 return Poll::Ready(self.finish_with_current_outcome());
326 }
327 Poll::Pending => {
328 if self.park(cx) {
329 return Poll::Pending;
330 }
331 }
332 }
333 }
334
335 DriverState::NeedsServerSettings => {
336 self.queue_settings();
337 // §6.9.2 forbids SETTINGS from altering the connection-level flow-control
338 // window — it stays at the 65535 RFC baseline unless we raise it via
339 // `WINDOW_UPDATE(0)`. Do that immediately after SETTINGS so peer bulk
340 // uploads aren't capped at ~5 Mbit/s × RTT.
341 let raise = i64::from(self.config.initial_connection_window_size())
342 - INITIAL_CONNECTION_RECV_WINDOW;
343 if raise > 0 {
344 let raise = u32::try_from(raise).unwrap_or(u32::MAX);
345 self.queue_window_update(0, raise);
346 self.connection_recv_window += i64::from(raise);
347 }
348 self.state = DriverState::Running;
349 }
350
351 DriverState::Running => match self.poll_advance_read(cx) {
352 Poll::Ready(Ok(Action::Continue)) => {}
353 Poll::Ready(Ok(Action::Emit(conn))) => {
354 return Poll::Ready(Some(Ok(*conn)));
355 }
356 Poll::Ready(Ok(Action::Close(outcome))) => {
357 self.begin_close(outcome);
358 }
359 // Protocol errors need a GOAWAY on the wire before we terminate;
360 // `begin_close` queues that and transitions us to Closing so the next
361 // outer-loop iteration drains the frame. Io errors short-circuit with
362 // no GOAWAY (`begin_close` already skips queuing for those).
363 Poll::Ready(Err(e)) => {
364 self.begin_close(e);
365 }
366 Poll::Pending => {
367 if self.park(cx) {
368 return Poll::Pending;
369 }
370 }
371 },
372
373 DriverState::Closing => unreachable!("handled above once write_buf is drained"),
374
375 DriverState::Drained => match self.poll_drain_peer(cx) {
376 Poll::Ready(()) => {
377 return Poll::Ready(self.finish_with_current_outcome());
378 }
379 Poll::Pending => return Poll::Pending,
380 },
381 }
382 }
383
384 // Cooperative yield: we made `copy_loops_per_yield` rounds of progress without
385 // hitting an internal Pending. Re-arm immediately and let the runtime pick up
386 // anything else it has waiting before we resume.
387 cx.waker().wake_by_ref();
388 Poll::Pending
389 }
390
391 /// Register the driver's waker with the shared `outbound_waker` (so handler tasks can
392 /// wake the driver) and tell the caller whether it's safe to park. Returns `true` if
393 /// the driver should return `Poll::Pending`, or `false` if a handler produced work
394 /// between our last check and the registration — in which case the caller should loop
395 /// around to pick it up.
396 fn park(&mut self, cx: &mut Context<'_>) -> bool {
397 self.connection.outbound_waker().register(cx.waker());
398 !self.has_pending_handler_signals() && !self.has_pending_outbound_progress()
399 }
400
401 /// Convert the current `close_outcome` into the terminal return of [`Self::drive`]. Must
402 /// only be called after outbound bytes have been flushed. Graceful closes return `None`;
403 /// errors surface as a final `Some(Err(...))` before subsequent polls return `None`.
404 fn finish_with_current_outcome(&mut self) -> Option<Result<Conn<H2Transport>, H2Error>> {
405 self.finished = true;
406 // Complete every outstanding `H2Connection::send_ping` future with an error so
407 // awaiting callers don't block forever. Safe to call regardless of outcome —
408 // a no-op if no pings are in flight.
409 self.connection.fail_pending_pings(
410 io::ErrorKind::ConnectionAborted,
411 "h2 connection closed before PING ACK",
412 );
413 // Wake any `PeerSettings` waiters so a peer that disconnects without ever sending
414 // SETTINGS doesn't strand them. Their `poll` rechecks swansong state and returns
415 // Ready; the caller's follow-up operation surfaces the connection-closed error.
416 self.connection.wake_peer_settings_waiters();
417 match self.close_outcome.take() {
418 None | Some(CloseOutcome::Graceful) => None,
419 Some(CloseOutcome::Protocol(code)) => Some(Err(H2Error::Protocol(code))),
420 Some(CloseOutcome::Io(e)) => Some(Err(H2Error::Io(e))),
421 }
422 }
423
424 /// Enter the closing state: record the outcome and queue a GOAWAY (only for outcomes
425 /// that warrant one). The main loop will drain `write_buf` and then finish.
426 fn begin_close(&mut self, outcome: CloseOutcome) {
427 log::trace!("h2 driver: begin_close({outcome:?})");
428 // Don't overwrite a prior outcome (e.g. if an error fires in the middle of a
429 // graceful shutdown, keep the error).
430 let code = match &outcome {
431 CloseOutcome::Graceful => Some(H2ErrorCode::NoError),
432 CloseOutcome::Protocol(code) => Some(*code),
433 CloseOutcome::Io(_) => None,
434 };
435 if self.close_outcome.is_none() {
436 self.close_outcome = Some(outcome);
437 }
438 if let Some(code) = code {
439 self.queue_goaway(self.last_peer_stream_id, code);
440 }
441 self.state = DriverState::Closing;
442 }
443
444 /// Read bytes from the transport into `read_buf[read_filled..target]` until
445 /// `read_filled >= target`. Cancel-safe: if the caller drops the Future, any bytes
446 /// already placed are preserved in the buffer.
447 ///
448 /// A 0-byte read is surfaced as `UnexpectedEof`. The caller maps this to a terminal
449 /// I/O error; we don't emit a GOAWAY on peer-initiated close (consistent with the pre-
450 /// poll driver).
451 fn poll_fill_to(&mut self, target: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
452 if self.read_buf.len() < target {
453 self.read_buf.resize(target, 0);
454 }
455 while self.read_filled < target {
456 let n = ready!(
457 Pin::new(&mut self.transport)
458 .poll_read(cx, &mut self.read_buf[self.read_filled..target])
459 )?;
460 if n == 0 {
461 return Poll::Ready(Err(io::Error::from(io::ErrorKind::UnexpectedEof)));
462 }
463 self.read_filled += n;
464 }
465 Poll::Ready(Ok(()))
466 }
467
468 /// Post-GOAWAY, drain whatever inbound bytes are *immediately* available from the
469 /// peer so our Drop sends a clean FIN (no unread data → no TCP RST) while the peer
470 /// sees the GOAWAY we just emitted. Read loops internally: consume each Ready chunk,
471 /// discard it, ask for more. Exits as soon as the transport returns `Pending` (no
472 /// bytes available right now) OR `Ready(0)` (peer FIN already arrived) OR any error.
473 ///
474 /// Does **not** register the waker on `Pending` — we're actively closing, not
475 /// observing the peer. A peer that happens to send more bytes after our exit will
476 /// have those bytes dropped when the transport is closed; that's a race the peer
477 /// chose to lose by sending after receiving our GOAWAY.
478 ///
479 /// Returning `Ready(())` unconditionally (no `Pending` case) lets the caller finalize
480 /// immediately. The `Poll` wrapper is kept for symmetry with the rest of the driver's
481 /// poll-style methods.
482 fn poll_drain_peer(&mut self, cx: &mut Context<'_>) -> Poll<()> {
483 // A peer flooding us with bytes could keep this loop going a long time. Cap it
484 // so a pathological client can't pin our close-out forever.
485 const MAX_DISCARD_ITERATIONS: usize = 256;
486 // Lightweight scratch — we're throwing it away. 512 balances "drain in few
487 // iterations" against "don't hold a large buffer for a rare path."
488 let mut scratch = [0u8; 512];
489 for _ in 0..MAX_DISCARD_ITERATIONS {
490 // We pass `cx` through for the benefit of the transport's `poll_read` contract,
491 // but we *interpret* `Pending` as "done draining" rather than parking on it —
492 // we're actively closing, not observing. A peer that sends more bytes after
493 // our exit loses the race.
494 match Pin::new(&mut self.transport).poll_read(cx, &mut scratch) {
495 Poll::Ready(Ok(0) | Err(_)) | Poll::Pending => {
496 return Poll::Ready(());
497 }
498 Poll::Ready(Ok(_)) => {}
499 }
500 }
501 Poll::Ready(())
502 }
503
504 /// Look up why a stream is closed. `None` means either never-opened or evicted from the
505 /// bounded ledger — both fall through to the connection-level §5.1.1 default.
506 pub(super) fn closed_reason(&self, stream_id: u32) -> Option<ClosedReason> {
507 self.closed_streams.reason(stream_id)
508 }
509}