trillium_http/h2/transport.rs
1//! Per-stream transport handed to handler tasks.
2//!
3//! [`H2Transport`] is the [`AsyncRead`] + [`AsyncWrite`] view of a single HTTP/2 stream. It is
4//! carried on the emitted [`Conn`][crate::Conn] returned from [`H2Driver::next`], and the
5//! runtime adapter spawns a handler task that consumes it. The transport never touches the
6//! underlying TCP connection directly — all I/O coordinates through shared per-stream state
7//! on the [`H2Connection`] driven by the driver task.
8//!
9//! Two paths reach the impls:
10//!
11//! - **Normal HTTP/2 request/response**: handlers usually don't touch [`H2Transport`] directly
12//! (same sharp edge h1 and h3 document). [`ReceivedBody`][crate::ReceivedBody] reads request body
13//! bytes through the transport's `AsyncRead` via
14//! [`ReceivedBody::handle_h2_data`][crate::ReceivedBody::handle_h2_data]. Response bytes flow
15//! through [`H2Connection::submit_send`][submit_send] to the driver's send pump, which frames
16//! HEADERS + DATA + trailing HEADERS onto the connection without ever touching this `AsyncWrite`.
17//!
18//! - **Extended-CONNECT upgrades** ([RFC 8441] WebSocket-over-h2, plus the in-progress
19//! `draft-ietf-webtrans-http2` for WebTransport-over-h2): after the handler responds 200 to a
20//! `CONNECT` request with a `:protocol` pseudo-header, [`Conn::send_h2`][crate::Conn::send_h2]
21//! routes through [`H2Connection::submit_upgrade`][submit_upgrade] which frames HEADERS without
22//! `END_STREAM`, signals send completion early, and leaves the stream open as a bidirectional
23//! byte channel. The runtime adapter then dispatches
24//! [`Handler::upgrade`][trillium::Handler::upgrade], which gets an [`Upgrade`][crate::Upgrade]
25//! wrapping this transport. `AsyncWrite::poll_write` appends to a per-stream outbound queue
26//! ([`SendState::outbound`]); the driver's send pump drains it into DATA frames bounded by the
27//! per-stream and connection send windows. `AsyncWrite::poll_close` flips
28//! [`SendState::outbound_close_requested`] so the driver eventually emits `DATA(END_STREAM)` and
29//! tears the stream down.
30//!
31//! [`H2Driver::next`]: super::H2Driver::next
32//! [`H2Connection`]: super::H2Connection
33//! [`BoxedTransport`]: crate::transport::BoxedTransport
34//! [submit_send]: super::H2Connection::submit_send
35//! [submit_upgrade]: super::H2Connection::submit_upgrade
36//! [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
37
38use super::{H2Connection, H2ErrorCode};
39use crate::{
40 Body, Buffer, Headers,
41 headers::hpack::{FieldSection, PseudoHeaders},
42};
43use atomic_waker::AtomicWaker;
44use futures_lite::io::{AsyncRead, AsyncWrite};
45use std::{
46 fmt, io,
47 pin::Pin,
48 sync::{
49 Arc, Mutex,
50 atomic::{AtomicBool, AtomicU64, Ordering},
51 },
52 task::{Context, Poll},
53};
54
55/// A single HTTP/2 stream's transport handle.
56///
57/// Carries a backref to the shared [`H2Connection`], the stream id, and the per-stream
58/// `Arc<StreamState>` used by the read side. Normal HTTP/2 operation reads through
59/// [`ReceivedBody`][crate::ReceivedBody] and writes through the connection's send queue;
60/// the `AsyncRead` / `AsyncWrite` impls here are only reached by code that borrows the
61/// transport directly (typically an upgrade handler after extended CONNECT).
62pub struct H2Transport {
63 connection: Arc<H2Connection>,
64 stream_id: u32,
65 state: Arc<StreamState>,
66}
67
68impl fmt::Debug for H2Transport {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 f.debug_struct("H2Transport")
71 .field("stream_id", &self.stream_id)
72 .finish_non_exhaustive()
73 }
74}
75
76impl H2Transport {
77 /// Create a transport for a stream that has just been opened by the driver.
78 pub(super) fn new(
79 connection: Arc<H2Connection>,
80 stream_id: u32,
81 state: Arc<StreamState>,
82 ) -> Self {
83 Self {
84 connection,
85 stream_id,
86 state,
87 }
88 }
89
90 /// The stream identifier this transport is bound to.
91 pub fn stream_id(&self) -> u32 {
92 self.stream_id
93 }
94
95 /// The shared [`H2Connection`] backing this stream.
96 pub fn connection(&self) -> &Arc<H2Connection> {
97 &self.connection
98 }
99}
100
101impl Drop for H2Transport {
102 /// Application-side release / cancel signal, depending on stream state:
103 ///
104 /// - **Wire-closed cleanly** (`send.completed && recv.eof`): the application is done with a
105 /// stream that already finished on the wire. The client-role lifecycle keeps such streams in
106 /// the map after wire-close (see [`H2Driver::try_close_if_both_done`][super::H2Driver]) so
107 /// the application's transport handle remains valid for trailer / late-read access. Dropping
108 /// the transport is the signal that the application is done, and we forward it to the
109 /// connection so the driver removes the entry from both maps.
110 ///
111 /// - **Wire-incomplete** (handler panic, conn task abandoned, client awaiting a response that
112 /// never came): emit `RST_STREAM(Cancel)` so the peer learns we're abandoning the stream.
113 /// Without this the leak persists until the entire connection tears down. Symmetric for both
114 /// roles.
115 ///
116 /// - **Already gone from the shared map**: driver beat us to cleanup; no-op.
117 ///
118 /// - **Upgrade path graceful close in flight** (`outbound_close_requested`): user has already
119 /// asked for graceful close via [`Self::poll_close`]; the driver is draining the outbound
120 /// queue + emitting `DATA(END_STREAM)`. Don't RST in that window.
121 fn drop(&mut self) {
122 // Cheap pre-check: if the stream is no longer in the shared map the driver has
123 // already cleaned up; nothing to do.
124 if !self.connection.streams_lock().contains_key(&self.stream_id) {
125 return;
126 }
127 let send_done = self.state.send.completed.load(Ordering::Acquire);
128 let recv_done = self.state.recv.eof.load(Ordering::Acquire);
129 if send_done && recv_done {
130 log::trace!(
131 "h2 stream {}: H2Transport dropped on wire-closed stream — releasing",
132 self.stream_id,
133 );
134 self.connection.release_stream(self.stream_id);
135 return;
136 }
137 // Upgrade path graceful close in flight — let the driver finish.
138 if self
139 .state
140 .send
141 .outbound_close_requested
142 .load(Ordering::Acquire)
143 {
144 return;
145 }
146 log::debug!(
147 "h2 stream {}: H2Transport dropped mid-stream — RST_STREAM(Cancel) \
148 (send_done={send_done}, recv_done={recv_done})",
149 self.stream_id,
150 );
151 self.connection
152 .stream_error(self.stream_id, H2ErrorCode::Cancel);
153 }
154}
155
156impl AsyncRead for H2Transport {
157 fn poll_read(
158 self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 out: &mut [u8],
161 ) -> Poll<io::Result<usize>> {
162 if out.is_empty() {
163 return Poll::Ready(Ok(0));
164 }
165
166 // The first `poll_read` is the handler's declaration of intent to consume the request
167 // body — until this point, we've advertised a zero recv window and the peer has sent
168 // nothing beyond HEADERS. Tell the driver to top up our per-stream window now. Later
169 // calls CAS-fail silently and don't re-signal.
170 let recv_state = &self.state.recv;
171 let connection = &*self.connection;
172 if !recv_state.is_reading.swap(true, Ordering::AcqRel) {
173 self.state.needs_servicing.store(true, Ordering::Release);
174 connection.outbound_waker().wake();
175 }
176
177 let mut recv = recv_state.buf.lock().expect("recv buf mutex poisoned");
178
179 // Copy as many bytes as fit from the front of the ring into `out`, then advance the
180 // ring's virtual read cursor. `Buffer::ignore_front` truncates the underlying `Vec` to
181 // zero when we drain fully, so capacity stays bounded by peak in-flight bytes rather
182 // than cumulative traffic.
183 let take = out.len().min(recv.len());
184 if take > 0 {
185 out[..take].copy_from_slice(&recv[..take]);
186 recv.ignore_front(take);
187 // Drop the buf lock before the waker fire so the driver can grab it without
188 // contention when it wakes.
189 drop(recv);
190 // Tell the driver how many bytes the handler consumed so it can emit a matching
191 // `WINDOW_UPDATE` and keep the peer's stream + connection windows topped up.
192 // `fetch_add` accumulates across calls that happen before the driver's next
193 // service tick; the driver's `swap(0)` takes the whole batch at once.
194 recv_state
195 .bytes_consumed
196 .fetch_add(take as u64, Ordering::AcqRel);
197 self.state.needs_servicing.store(true, Ordering::Release);
198 connection.outbound_waker().wake();
199 return Poll::Ready(Ok(take));
200 }
201
202 // Buffer empty. EOF if END_STREAM was observed, otherwise register and wait.
203 // The driver acquires the same `buf` lock to push data and to set `eof`, so holding
204 // it here is enough to make the eof check final — no register-then-check race window
205 // between us and the driver's wake.
206 if recv_state.eof.load(Ordering::Acquire) {
207 return Poll::Ready(Ok(0));
208 }
209 recv_state.waker.register(cx.waker());
210 Poll::Pending
211 }
212}
213
214impl AsyncWrite for H2Transport {
215 fn poll_write(
216 self: Pin<&mut Self>,
217 cx: &mut Context<'_>,
218 buf: &[u8],
219 ) -> Poll<io::Result<usize>> {
220 // Append into the per-stream outbound queue used by the extended-CONNECT
221 // (RFC 8441) upgrade path. The driver's send pump drains the same queue (via
222 // the upgrade body's `AsyncRead::poll_read`) into DATA frames bounded by
223 // per-stream + connection send windows.
224 //
225 // Bounded by `config.response_buffer_max_len` — the same cap h1 and h3 response
226 // paths use for their transit buffers. If the peer's flow-control window stalls
227 // (slow or malicious reader) the driver can't drain `outbound`, the cap is hit,
228 // and we return `Pending` so the handler is throttled. The drain side
229 // (`H2OutboundReader::poll_read`) wakes `outbound_write_waker` after each take.
230 let send = &self.state.send;
231
232 if send.outbound_close_requested.load(Ordering::Acquire) {
233 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
234 }
235
236 let cap = self.connection.context.config.response_buffer_max_len;
237 let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
238 if outbound.len() >= cap {
239 // Register first, then re-check under lock to close the race against the
240 // drain side (`H2OutboundReader::poll_read` takes the same lock to call
241 // `ignore_front` and then wakes us). If a drain landed between our length
242 // check and the register, the second check sees the freed space.
243 send.outbound_write_waker.register(cx.waker());
244 if outbound.len() >= cap {
245 return Poll::Pending;
246 }
247 }
248 let take = (cap - outbound.len()).min(buf.len());
249 log::trace!(
250 "h2 stream {}: H2Transport::poll_write appending {take}/{} bytes to outbound queue",
251 self.stream_id,
252 buf.len(),
253 );
254 outbound.extend_from_slice(&buf[..take]);
255 drop(outbound);
256
257 // Wake the driver task (if parked on the connection-level waker) and the
258 // upgrade body's poll_read (in case it's registered between driver ticks).
259 // Firing both is cheap and resolves the cross-task race where the driver
260 // happens to be parked on `connection.outbound_waker` rather than mid-body-poll.
261 send.outbound_waker.wake();
262 self.connection.outbound_waker().wake();
263 Poll::Ready(Ok(take))
264 }
265
266 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
267 // Best-effort: bytes appended via `poll_write` are already visible to the driver
268 // and will be framed on the next tick. There's no application-level "flushed"
269 // state below us to wait on.
270 Poll::Ready(Ok(()))
271 }
272
273 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
274 // Mark the upgrade write-half closed. Once the driver drains the remaining
275 // outbound bytes, the upgrade body's `poll_read` will return `Ready(0)`, the
276 // send pump transitions through trailers (none) into `DATA(END_STREAM)`, and the
277 // stream tears down via the normal `complete_and_remove_stream` path.
278 log::trace!(
279 "h2 stream {}: H2Transport::poll_close marking outbound closed",
280 self.stream_id,
281 );
282 self.state
283 .send
284 .outbound_close_requested
285 .store(true, Ordering::Release);
286 self.state.send.outbound_waker.wake();
287 self.connection.outbound_waker().wake();
288 Poll::Ready(Ok(()))
289 }
290}
291
292/// Shared per-stream state. Owned by an [`Arc`] held jointly by the driver (via the connection's
293/// stream table) and the handler task (via [`H2Transport`]).
294#[derive(Debug, Default)]
295pub(super) struct StreamState {
296 /// Recv side: inbound DATA payloads, EOF flag, handler waker, handler-intent signal.
297 pub(super) recv: RecvState,
298 /// Send side: handoff slot from the conn task's `submit_send`, plus completion signaling
299 /// the conn task awaits.
300 pub(super) send: SendState,
301 /// Stream-error request raised from the conn-task side. Populated by
302 /// [`H2Connection::stream_error`][super::H2Connection::stream_error] when something on
303 /// the conn-task side (a body-read that detects content-length mismatch, a handler
304 /// that wants to abort) needs the driver to emit `RST_STREAM` and clean up. The driver
305 /// picks this up in `service_handler_signals` on its next tick and routes through
306 /// [`H2Driver::complete_and_remove_stream`][super::H2Driver]'s normal cleanup path.
307 ///
308 /// [`H2Driver`]: super::H2Driver
309 pub(super) pending_reset: Mutex<Option<H2ErrorCode>>,
310
311 /// Client-role: the application has dropped its [`H2Transport`] handle on a stream that
312 /// already wire-closed cleanly (both halves observed `END_STREAM`). The driver removes
313 /// the stream from both maps on its next `service_handler_signals` tick. No `RST_STREAM`
314 /// — the wire-side is already closed; this is purely application-side resource cleanup.
315 /// Distinct from [`Self::pending_reset`], which emits `RST_STREAM` for unclean teardown.
316 ///
317 /// Server role never sets this — server streams are removed eagerly when the response
318 /// finishes sending (no held-after-close lifecycle).
319 pub(super) pending_release: AtomicBool,
320
321 /// Mailbox flag for conn-task → driver work signaling.
322 ///
323 /// Set to `true` by conn-task code whenever it produces work the driver should service
324 /// (new submission, [`Self::pending_reset`], [`Self::pending_release`], a
325 /// [`RecvState::bytes_consumed`] increment, or a [`RecvState::is_reading`] transition).
326 /// The driver's `service_handler_signals` walks every stream and consults this flag via
327 /// `swap(false, AcqRel)` — only streams where it returns `true` pay for the per-field
328 /// pickup (mutex acquires for `submission` / `pending_reset`, etc.). Idle streams cost a
329 /// single atomic RMW per tick.
330 ///
331 /// **Setter ordering rule**: write the underlying state first, then store `true` with
332 /// `Release`, then call [`H2Connection::outbound_waker`][super::H2Connection]`.wake()`.
333 /// The `Release` store + driver's `AcqRel` swap form the synchronization edge that
334 /// publishes the underlying state to the driver. Over-notification (driver clears, finds
335 /// nothing, moves on) is harmless; under-notification would lose a signal — which is why
336 /// the underlying state must be written *before* the flag store.
337 pub(super) needs_servicing: AtomicBool,
338}
339
340/// Receive-side per-stream state.
341#[derive(Debug, Default)]
342pub(super) struct RecvState {
343 /// Inbound DATA body bytes awaiting handler read. A single persistent ring (append-at-tail,
344 /// `ignore_front`-at-head): the driver appends via `extend_from_slice` when a DATA frame
345 /// arrives; the handler reads from the front and virtually drops consumed bytes. When
346 /// `ignore_front` catches up to the data end the `Buffer` truncates to zero, so the
347 /// underlying `Vec` capacity stays bounded by peak in-flight bytes rather than cumulative
348 /// traffic — zero amortized allocations per DATA frame.
349 pub(super) buf: Mutex<Buffer>,
350
351 /// `true` once `END_STREAM` has been observed for this stream's recv side. Set by the
352 /// driver under the same `buf` lock used for pushes; checked by `poll_read` while
353 /// holding that lock to decide between EOF and Pending.
354 pub(super) eof: AtomicBool,
355
356 /// Handler-task waker, fired by the driver after pushing DATA into `buf` or after
357 /// setting `eof`. Single-waiter: only one task ever polls a given `H2Transport`.
358 pub(super) waker: AtomicWaker,
359
360 /// Set by the handler's first [`H2Transport::poll_read`] to declare intent to consume the
361 /// request body. The driver observes the transition and emits a `WINDOW_UPDATE` for this
362 /// stream, topping its recv window up from `SETTINGS_INITIAL_WINDOW_SIZE` (advertised as
363 /// `0`) to the per-stream maximum. Once set, stays set.
364 pub(super) is_reading: AtomicBool,
365
366 /// Bytes the handler has consumed from `buf` since the driver last sampled this counter.
367 /// Incremented by [`H2Transport::poll_read`] using `fetch_add` after each drain; the
368 /// driver reads it via `swap(0)` on each tick and emits stream-level + connection-level
369 /// `WINDOW_UPDATE` for the consumed total. Ensures a handler draining a body larger than
370 /// a single window doesn't stall the peer.
371 pub(super) bytes_consumed: AtomicU64,
372
373 /// Trailers, populated by the driver if a trailing HEADERS frame arrives for this stream.
374 /// Always written *before* `eof` is set, so once the handler observes `Ready(0)` on the
375 /// recv side, any trailers for this request are guaranteed to be in place.
376 ///
377 /// Taken out and moved into [`Conn::request_trailers`][crate::Conn] by the receiver-side
378 /// body state machine when it transitions to
379 /// [`ReceivedBodyState::End`][crate::received_body::ReceivedBodyState].
380 pub(super) trailers: Mutex<Option<Headers>>,
381
382 /// Client-role: response HEADERS field section, populated by the driver on the first
383 /// HEADERS frame arrival for a client-initiated stream. Server role doesn't use this slot
384 /// (response HEADERS go *out* on the server, not in). Single-shot: the conn task takes
385 /// the `FieldSection` via [`H2Connection::response_headers`][super::H2Connection] once;
386 /// subsequent HEADERS arrivals on the same stream are interpreted as trailers and routed
387 /// to the [`Self::trailers`] slot. 1xx interim responses are not modeled — the slot is
388 /// one `FieldSection` per stream, matching the same constraint elsewhere in trillium.
389 pub(super) response_headers: Mutex<Option<FieldSection<'static>>>,
390
391 /// Client-role: latching flag for "first HEADERS arrived for this stream." Distinct from
392 /// `response_headers.is_some()` — the conn task drains that slot when it consumes
393 /// headers, so the driver can't use slot occupancy to distinguish "haven't seen
394 /// HEADERS yet" from "headers seen + already taken." Set inside `finalize_response_headers`
395 /// before that slot is populated; checked by `route_headers` on subsequent HEADERS to
396 /// route them as trailers. Never cleared.
397 pub(super) first_response_headers_seen: AtomicBool,
398
399 /// Client-role: waker the conn task registers via
400 /// [`H2Connection::response_headers`][super::H2Connection]; fired by the driver after
401 /// stashing the `FieldSection` in [`Self::response_headers`] *or* on stream removal (so
402 /// a parked conn task observing the stream gone surfaces `NotConnected` instead of
403 /// hanging).
404 pub(super) response_headers_waker: AtomicWaker,
405}
406
407/// Send-side per-stream state used to hand a response from the conn task to the driver,
408/// plus the outbound byte queue for extended-CONNECT upgraded streams.
409///
410/// **Normal response path**: the conn task fills `submission` once via
411/// [`H2Connection::submit_send`][submit] and waits on `completion_waker` for `completed` to
412/// flip. The driver picks up the submission on its next `drive` tick, frames it (HEADERS,
413/// DATA, optional trailing HEADERS) into the connection's outbound buffer as send-side flow
414/// control allows, and on completion stores the `completion_result`, sets `completed = true`,
415/// and wakes the conn task.
416///
417/// **Extended-CONNECT upgrade path** ([RFC 8441]): the conn task calls
418/// [`H2Connection::submit_upgrade`][submit_upgrade], which constructs an
419/// [`H2OutboundReader`] over `outbound` / `outbound_close_requested` /
420/// `outbound_waker` and submits it as the response body. The driver signals
421/// `completion_waker` as soon as the response HEADERS frame is on the wire (instead of
422/// waiting for the body to drain), so the conn task's `submit_upgrade().await` returns and
423/// the runtime adapter can dispatch [`Handler::upgrade`][trillium::Handler::upgrade]. The
424/// upgrade handler then writes through [`H2Transport`]'s `AsyncWrite`, which appends to
425/// `outbound`; the driver's send pump pulls those bytes out via the body's `AsyncRead`
426/// and frames them as DATA. Closing the transport sets `outbound_close_requested`, the
427/// reader returns `Ready(0)`, and the send pump terminates the stream with
428/// `DATA(END_STREAM)`.
429///
430/// [submit]: super::H2Connection::submit_send
431/// [submit_upgrade]: super::H2Connection::submit_upgrade
432/// [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
433#[derive(Debug, Default)]
434pub(super) struct SendState {
435 /// Slot for the conn task's submission. Some between `submit_send` and the driver's
436 /// pickup tick; None at all other times.
437 pub(super) submission: Mutex<Option<Submission>>,
438
439 /// Set to `true` by the driver once the response has been fully framed, flushed, or
440 /// errored. The conn task's `SubmitSend` future polls this atomic and registers on
441 /// `completion_waker`.
442 pub(super) completed: AtomicBool,
443
444 /// The driver writes the final result here before flipping `completed`. The conn task
445 /// takes it once `completed` is observed true.
446 pub(super) completion_result: Mutex<Option<io::Result<()>>>,
447
448 /// The conn task's waker, registered by `SubmitSend::poll` and fired by the driver
449 /// after `completed` is set.
450 pub(super) completion_waker: AtomicWaker,
451
452 /// Outbound bytes for an extended-CONNECT (RFC 8441) upgraded stream.
453 /// Appended to by [`H2Transport`]'s `AsyncWrite::poll_write` and drained by the
454 /// upgrade body's `AsyncRead::poll_read` (the driver-task side of the send pump).
455 /// Empty for normal responses — the driver pumps the response [`Body`] directly.
456 pub(super) outbound: Mutex<Buffer>,
457
458 /// Set by [`H2Transport::poll_close`] to mark the upgrade-side write half closed.
459 /// The upgrade body's `poll_read` returns `Ready(0)` once `outbound` is empty and
460 /// this flag is set, which transitions the driver's send pump into the
461 /// trailers/`DATA(END_STREAM)` phase.
462 pub(super) outbound_close_requested: AtomicBool,
463
464 /// Waker for the upgrade body's `poll_read`. Fired by [`H2Transport::poll_write`]
465 /// after appending bytes and by [`H2Transport::poll_close`] after flipping
466 /// `outbound_close_requested`. Registered by the body during its `poll_read` when
467 /// it observes an empty buffer and no close flag.
468 pub(super) outbound_waker: AtomicWaker,
469
470 /// Reverse-direction waker: registered by [`H2Transport::poll_write`] when `outbound`
471 /// has reached the configured cap, fired by [`H2OutboundReader::poll_read`] after it
472 /// drains bytes (i.e. after `ignore_front`) so a parked writer can resume. This is the
473 /// edge that surfaces peer flow-control backpressure to the upgrade handler — without
474 /// it, a slow or unresponsive peer's closed window would let `outbound` grow without
475 /// bound.
476 pub(super) outbound_write_waker: AtomicWaker,
477}
478
479/// `AsyncRead` source the driver uses as the response body for an extended-CONNECT upgrade.
480///
481/// Reads from [`SendState::outbound`] — the same per-stream queue [`H2Transport`]'s
482/// `AsyncWrite::poll_write` appends to. Returns `Ready(0)` once the queue is empty and
483/// [`SendState::outbound_close_requested`] has been set (handler dropped or called
484/// `poll_close` on the transport), at which point the driver's send pump transitions
485/// through trailers (none) into `DATA(END_STREAM)` and tears the stream down.
486///
487/// Constructed by [`H2Connection::submit_upgrade`][super::H2Connection::submit_upgrade];
488/// wrapped in [`Body::new_streaming`] so the existing send pump can pump it as if it were
489/// any other unknown-length response body.
490#[derive(Debug)]
491pub(super) struct H2OutboundReader {
492 state: Arc<StreamState>,
493 stream_id: u32,
494}
495
496impl H2OutboundReader {
497 pub(super) fn new(state: Arc<StreamState>, stream_id: u32) -> Self {
498 Self { state, stream_id }
499 }
500}
501
502impl AsyncRead for H2OutboundReader {
503 fn poll_read(
504 self: Pin<&mut Self>,
505 cx: &mut Context<'_>,
506 out: &mut [u8],
507 ) -> Poll<io::Result<usize>> {
508 if out.is_empty() {
509 return Poll::Ready(Ok(0));
510 }
511
512 let send = &self.state.send;
513 let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
514 let take = out.len().min(outbound.len());
515 if take > 0 {
516 out[..take].copy_from_slice(&outbound[..take]);
517 outbound.ignore_front(take);
518 log::trace!(
519 "h2 stream {}: H2OutboundReader::poll_read drained {take} bytes",
520 self.stream_id,
521 );
522 // Drop the lock before waking — the writer reacquires it on resume.
523 drop(outbound);
524 // Surface flow-control backpressure: wake any writer parked on
525 // `outbound_write_waker` because the cap was hit. Registered-but-still-full
526 // is harmless — the writer's recheck under lock observes the new len.
527 send.outbound_write_waker.wake();
528 return Poll::Ready(Ok(take));
529 }
530
531 // Queue empty. Register first, then re-check the close flag. This closes the
532 // register-then-check race against `poll_close` (which doesn't take the buf
533 // lock — it just stores the flag and fires the waker). Holding the buf lock
534 // means `poll_write` can't race here; only `poll_close` can.
535 send.outbound_waker.register(cx.waker());
536
537 if send.outbound_close_requested.load(Ordering::Acquire) {
538 log::trace!(
539 "h2 stream {}: H2OutboundReader::poll_read EOF (close_requested + empty)",
540 self.stream_id,
541 );
542 return Poll::Ready(Ok(0));
543 }
544 Poll::Pending
545 }
546}
547
548/// What the conn task hands the driver to begin a send on a stream.
549///
550/// `body` carries either a normal response body or, for extended-CONNECT (RFC 8441)
551/// upgrades, a streaming body that reads from [`SendState::outbound`] (which the upgrade
552/// handler's [`H2Transport`] `AsyncWrite` writes into). Trailers (if any) come from
553/// [`Body::trailers`] after drain — not a separate field.
554///
555/// `is_upgrade` flips the driver's completion semantics: instead of signaling
556/// [`SubmitSend`][super::SubmitSend] completion after the body is fully on the wire, the
557/// driver signals completion as soon as the response HEADERS frame is flushed. That lets
558/// [`Conn::send_h2`][crate::Conn::send_h2] return so the runtime can dispatch
559/// [`Handler::upgrade`][trillium::Handler::upgrade], while the body keeps streaming in the
560/// background.
561#[derive(Debug)]
562pub(super) struct Submission {
563 /// Owned pseudo-headers for the response/request. Combined with `headers` on the driver
564 /// task to form a [`FieldSection`] which is then HPACK-encoded synchronously via
565 /// [`HpackEncoder`][crate::headers::hpack::HpackEncoder] at submission pickup. The
566 /// encoder runs only on the driver task: each pickup-tick encodes its submissions
567 /// against the live dynamic-table state, then frames HEADERS in the order they were
568 /// encoded — matching the wire-emission order that HPACK's stateful decoder requires.
569 pub(super) pseudos: PseudoHeaders<'static>,
570 /// Owned headers for the block. Cloned from the conn task's `request_headers` /
571 /// `response_headers` so those remain readable to caller and middleware after the send.
572 pub(super) headers: Headers,
573 pub(super) body: Option<Body>,
574 pub(super) is_upgrade: bool,
575}
576
577impl Submission {
578 /// Borrow this submission's headers as a [`FieldSection`] for encoding.
579 pub(super) fn field_section(&self) -> FieldSection<'_> {
580 FieldSection::new(self.pseudos.clone(), &self.headers)
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::HttpContext;
588 use futures_lite::{AsyncRead, AsyncWrite};
589 use std::{
590 sync::{
591 Arc,
592 atomic::{AtomicBool, Ordering},
593 },
594 task::{Context, Poll, Wake, Waker},
595 };
596
597 struct CountingWaker(AtomicBool);
598 impl Wake for CountingWaker {
599 fn wake(self: Arc<Self>) {
600 self.0.store(true, Ordering::Release);
601 }
602 }
603
604 fn pair_with_cap(cap: usize) -> (H2Transport, H2OutboundReader) {
605 let mut context = HttpContext::new();
606 context.config.response_buffer_max_len = cap;
607 let connection = H2Connection::new(Arc::new(context));
608 let state = Arc::new(StreamState::default());
609 let transport = H2Transport::new(connection.clone(), 1, state.clone());
610 let reader = H2OutboundReader::new(state, 1);
611 (transport, reader)
612 }
613
614 #[test]
615 fn poll_write_caps_at_response_buffer_max_len() {
616 // Cap of 16 bytes. Writing 32 bytes should accept exactly 16 (partial-write
617 // semantics; AsyncWriteExt::write_all retries the rest).
618 let (mut transport, _reader) = pair_with_cap(16);
619 let waker = Waker::from(Arc::new(CountingWaker(AtomicBool::new(false))));
620 let mut cx = Context::from_waker(&waker);
621
622 let buf = [0u8; 32];
623 match Pin::new(&mut transport).poll_write(&mut cx, &buf) {
624 Poll::Ready(Ok(n)) => assert_eq!(n, 16, "should accept exactly cap bytes"),
625 other => panic!("expected Ready(Ok(16)), got {other:?}"),
626 }
627 }
628
629 #[test]
630 fn poll_write_returns_pending_when_full_and_drain_wakes() {
631 let (mut transport, mut reader) = pair_with_cap(8);
632 let counting = Arc::new(CountingWaker(AtomicBool::new(false)));
633 let writer_waker = Waker::from(counting.clone());
634 let mut writer_cx = Context::from_waker(&writer_waker);
635
636 // Fill the buffer to the cap.
637 let buf = [0u8; 8];
638 match Pin::new(&mut transport).poll_write(&mut writer_cx, &buf) {
639 Poll::Ready(Ok(8)) => {}
640 other => panic!("expected Ready(Ok(8)), got {other:?}"),
641 }
642
643 // Next write must return Pending — buffer is at cap.
644 let extra = [0u8; 4];
645 match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
646 Poll::Pending => {}
647 other => panic!("expected Pending, got {other:?}"),
648 }
649 assert!(
650 !counting.0.load(Ordering::Acquire),
651 "writer waker should not have fired yet"
652 );
653
654 // Drain via the reader — this should wake the writer.
655 let reader_waker = Waker::noop().clone();
656 let mut reader_cx = Context::from_waker(&reader_waker);
657 let mut sink = [0u8; 4];
658 match Pin::new(&mut reader).poll_read(&mut reader_cx, &mut sink) {
659 Poll::Ready(Ok(4)) => {}
660 other => panic!("expected Ready(Ok(4)), got {other:?}"),
661 }
662 assert!(
663 counting.0.load(Ordering::Acquire),
664 "drain should have woken the writer"
665 );
666
667 // Re-poll the writer — there's now room for the 4 extra bytes.
668 match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
669 Poll::Ready(Ok(4)) => {}
670 other => panic!("expected Ready(Ok(4)), got {other:?}"),
671 }
672 }
673}