Skip to main content

trillium_http/h2/
transport.rs

1//! Per-stream transport handed to handler tasks.
2//!
3//! [`H2Transport`] is the [`AsyncRead`] + [`AsyncWrite`] view of a single HTTP/2 stream. It is
4//! carried on the emitted [`Conn`][crate::Conn] returned from [`H2Driver::next`], and the
5//! runtime adapter spawns a handler task that consumes it. The transport never touches the
6//! underlying TCP connection directly — all I/O coordinates through shared per-stream state
7//! on the [`H2Connection`] driven by the driver task.
8//!
9//! Two paths reach the impls:
10//!
11//! - **Normal HTTP/2 request/response**: handlers usually don't touch [`H2Transport`] directly
12//!   (same sharp edge h1 and h3 document). [`ReceivedBody`][crate::ReceivedBody] reads request body
13//!   bytes through the transport's `AsyncRead` via
14//!   [`ReceivedBody::handle_h2_data`][crate::ReceivedBody::handle_h2_data]. Response bytes flow
15//!   through [`H2Connection::submit_send`][submit_send] to the driver's send pump, which frames
16//!   HEADERS + DATA + trailing HEADERS onto the connection without ever touching this `AsyncWrite`.
17//!
18//! - **Extended-CONNECT upgrades** ([RFC 8441] WebSocket-over-h2, plus the in-progress
19//!   `draft-ietf-webtrans-http2` for WebTransport-over-h2): after the handler responds 200 to a
20//!   `CONNECT` request with a `:protocol` pseudo-header, [`Conn::send_h2`][crate::Conn::send_h2]
21//!   routes through [`H2Connection::submit_upgrade`][submit_upgrade] which frames HEADERS without
22//!   `END_STREAM`, signals send completion early, and leaves the stream open as a bidirectional
23//!   byte channel. The runtime adapter then dispatches
24//!   [`Handler::upgrade`][trillium::Handler::upgrade], which gets an [`Upgrade`][crate::Upgrade]
25//!   wrapping this transport. `AsyncWrite::poll_write` appends to a per-stream outbound queue
26//!   ([`SendState::outbound`]); the driver's send pump drains it into DATA frames bounded by the
27//!   per-stream and connection send windows. `AsyncWrite::poll_close` flips
28//!   [`SendState::outbound_close_requested`] so the driver eventually emits `DATA(END_STREAM)` and
29//!   tears the stream down.
30//!
31//! [`H2Driver::next`]: super::H2Driver::next
32//! [`H2Connection`]: super::H2Connection
33//! [`BoxedTransport`]: crate::transport::BoxedTransport
34//! [submit_send]: super::H2Connection::submit_send
35//! [submit_upgrade]: super::H2Connection::submit_upgrade
36//! [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
37
38use super::{H2Connection, H2ErrorCode};
39use crate::{
40    Body, Buffer, Headers,
41    headers::hpack::{FieldSection, PseudoHeaders},
42};
43use atomic_waker::AtomicWaker;
44use futures_lite::io::{AsyncRead, AsyncWrite};
45use std::{
46    fmt, io,
47    pin::Pin,
48    sync::{
49        Arc, Mutex,
50        atomic::{AtomicBool, AtomicU64, Ordering},
51    },
52    task::{Context, Poll},
53};
54
55/// A single HTTP/2 stream's transport handle.
56///
57/// Carries a backref to the shared [`H2Connection`], the stream id, and the per-stream
58/// `Arc<StreamState>` used by the read side. Normal HTTP/2 operation reads through
59/// [`ReceivedBody`][crate::ReceivedBody] and writes through the connection's send queue;
60/// the `AsyncRead` / `AsyncWrite` impls here are only reached by code that borrows the
61/// transport directly (typically an upgrade handler after extended CONNECT).
62pub struct H2Transport {
63    connection: Arc<H2Connection>,
64    stream_id: u32,
65    state: Arc<StreamState>,
66}
67
68impl fmt::Debug for H2Transport {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        f.debug_struct("H2Transport")
71            .field("stream_id", &self.stream_id)
72            .finish_non_exhaustive()
73    }
74}
75
76impl H2Transport {
77    /// Create a transport for a stream that has just been opened by the driver.
78    pub(super) fn new(
79        connection: Arc<H2Connection>,
80        stream_id: u32,
81        state: Arc<StreamState>,
82    ) -> Self {
83        Self {
84            connection,
85            stream_id,
86            state,
87        }
88    }
89
90    /// The stream identifier this transport is bound to.
91    pub fn stream_id(&self) -> u32 {
92        self.stream_id
93    }
94
95    /// The shared [`H2Connection`] backing this stream.
96    pub fn connection(&self) -> &Arc<H2Connection> {
97        &self.connection
98    }
99}
100
101impl Drop for H2Transport {
102    /// Application-side release / cancel signal, depending on stream state:
103    ///
104    /// - **Wire-closed cleanly** (`send.completed && recv.eof`): the application is done with a
105    ///   stream that already finished on the wire. The client-role lifecycle keeps such streams in
106    ///   the map after wire-close (see [`H2Driver::try_close_if_both_done`][super::H2Driver]) so
107    ///   the application's transport handle remains valid for trailer / late-read access. Dropping
108    ///   the transport is the signal that the application is done, and we forward it to the
109    ///   connection so the driver removes the entry from both maps.
110    ///
111    /// - **Wire-incomplete** (handler panic, conn task abandoned, client awaiting a response that
112    ///   never came): emit `RST_STREAM(Cancel)` so the peer learns we're abandoning the stream.
113    ///   Without this the leak persists until the entire connection tears down. Symmetric for both
114    ///   roles.
115    ///
116    /// - **Already gone from the shared map**: driver beat us to cleanup; no-op.
117    ///
118    /// - **Upgrade path graceful close in flight** (`outbound_close_requested`): user has already
119    ///   asked for graceful close via [`Self::poll_close`]; the driver is draining the outbound
120    ///   queue + emitting `DATA(END_STREAM)`. Don't RST in that window.
121    fn drop(&mut self) {
122        // Cheap pre-check: if the stream is no longer in the shared map the driver has
123        // already cleaned up; nothing to do.
124        if !self.connection.streams_lock().contains_key(&self.stream_id) {
125            return;
126        }
127        let send_done = self.state.send.completed.load(Ordering::Acquire);
128        let recv_done = self.state.recv.eof.load(Ordering::Acquire);
129        if send_done && recv_done {
130            log::trace!(
131                "h2 stream {}: H2Transport dropped on wire-closed stream — releasing",
132                self.stream_id,
133            );
134            self.connection.release_stream(self.stream_id);
135            return;
136        }
137        // Upgrade path graceful close in flight — let the driver finish.
138        if self
139            .state
140            .send
141            .outbound_close_requested
142            .load(Ordering::Acquire)
143        {
144            return;
145        }
146        log::debug!(
147            "h2 stream {}: H2Transport dropped mid-stream — RST_STREAM(Cancel) \
148             (send_done={send_done}, recv_done={recv_done})",
149            self.stream_id,
150        );
151        self.connection
152            .stream_error(self.stream_id, H2ErrorCode::Cancel);
153    }
154}
155
156impl AsyncRead for H2Transport {
157    fn poll_read(
158        self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160        out: &mut [u8],
161    ) -> Poll<io::Result<usize>> {
162        if out.is_empty() {
163            return Poll::Ready(Ok(0));
164        }
165
166        // The first `poll_read` is the handler's declaration of intent to consume the request
167        // body — until this point, we've advertised a zero recv window and the peer has sent
168        // nothing beyond HEADERS. Tell the driver to top up our per-stream window now. Later
169        // calls CAS-fail silently and don't re-signal.
170        let recv_state = &self.state.recv;
171        let connection = &*self.connection;
172        if !recv_state.is_reading.swap(true, Ordering::AcqRel) {
173            self.state.needs_servicing.store(true, Ordering::Release);
174            connection.outbound_waker().wake();
175        }
176
177        let mut recv = recv_state.buf.lock().expect("recv buf mutex poisoned");
178
179        // Copy as many bytes as fit from the front of the ring into `out`, then advance the
180        // ring's virtual read cursor. `Buffer::ignore_front` truncates the underlying `Vec` to
181        // zero when we drain fully, so capacity stays bounded by peak in-flight bytes rather
182        // than cumulative traffic.
183        let take = out.len().min(recv.len());
184        if take > 0 {
185            out[..take].copy_from_slice(&recv[..take]);
186            recv.ignore_front(take);
187            // Drop the buf lock before the waker fire so the driver can grab it without
188            // contention when it wakes.
189            drop(recv);
190            // Tell the driver how many bytes the handler consumed so it can emit a matching
191            // `WINDOW_UPDATE` and keep the peer's stream + connection windows topped up.
192            // `fetch_add` accumulates across calls that happen before the driver's next
193            // service tick; the driver's `swap(0)` takes the whole batch at once.
194            recv_state
195                .bytes_consumed
196                .fetch_add(take as u64, Ordering::AcqRel);
197            self.state.needs_servicing.store(true, Ordering::Release);
198            connection.outbound_waker().wake();
199            return Poll::Ready(Ok(take));
200        }
201
202        // Buffer empty. EOF if END_STREAM was observed, otherwise register and wait.
203        // The driver acquires the same `buf` lock to push data and to set `eof`, so holding
204        // it here is enough to make the eof check final — no register-then-check race window
205        // between us and the driver's wake.
206        if recv_state.eof.load(Ordering::Acquire) {
207            return Poll::Ready(Ok(0));
208        }
209        recv_state.waker.register(cx.waker());
210        Poll::Pending
211    }
212}
213
214impl AsyncWrite for H2Transport {
215    fn poll_write(
216        self: Pin<&mut Self>,
217        cx: &mut Context<'_>,
218        buf: &[u8],
219    ) -> Poll<io::Result<usize>> {
220        // Append into the per-stream outbound queue used by the extended-CONNECT
221        // (RFC 8441) upgrade path. The driver's send pump drains the same queue (via
222        // the upgrade body's `AsyncRead::poll_read`) into DATA frames bounded by
223        // per-stream + connection send windows.
224        //
225        // Bounded by `config.response_buffer_max_len` — the same cap h1 and h3 response
226        // paths use for their transit buffers. If the peer's flow-control window stalls
227        // (slow or malicious reader) the driver can't drain `outbound`, the cap is hit,
228        // and we return `Pending` so the handler is throttled. The drain side
229        // (`H2OutboundReader::poll_read`) wakes `outbound_write_waker` after each take.
230        let send = &self.state.send;
231
232        if send.outbound_close_requested.load(Ordering::Acquire) {
233            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
234        }
235
236        let cap = self.connection.context.config.response_buffer_max_len;
237        let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
238        if outbound.len() >= cap {
239            // Register first, then re-check under lock to close the race against the
240            // drain side (`H2OutboundReader::poll_read` takes the same lock to call
241            // `ignore_front` and then wakes us). If a drain landed between our length
242            // check and the register, the second check sees the freed space.
243            send.outbound_write_waker.register(cx.waker());
244            if outbound.len() >= cap {
245                return Poll::Pending;
246            }
247        }
248        let take = (cap - outbound.len()).min(buf.len());
249        log::trace!(
250            "h2 stream {}: H2Transport::poll_write appending {take}/{} bytes to outbound queue",
251            self.stream_id,
252            buf.len(),
253        );
254        outbound.extend_from_slice(&buf[..take]);
255        drop(outbound);
256
257        // Wake the driver task (if parked on the connection-level waker) and the
258        // upgrade body's poll_read (in case it's registered between driver ticks).
259        // Firing both is cheap and resolves the cross-task race where the driver
260        // happens to be parked on `connection.outbound_waker` rather than mid-body-poll.
261        send.outbound_waker.wake();
262        self.connection.outbound_waker().wake();
263        Poll::Ready(Ok(take))
264    }
265
266    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
267        // Best-effort: bytes appended via `poll_write` are already visible to the driver
268        // and will be framed on the next tick. There's no application-level "flushed"
269        // state below us to wait on.
270        Poll::Ready(Ok(()))
271    }
272
273    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
274        // Mark the upgrade write-half closed. Once the driver drains the remaining
275        // outbound bytes, the upgrade body's `poll_read` will return `Ready(0)`, the
276        // send pump transitions through trailers (none) into `DATA(END_STREAM)`, and the
277        // stream tears down via the normal `complete_and_remove_stream` path.
278        log::trace!(
279            "h2 stream {}: H2Transport::poll_close marking outbound closed",
280            self.stream_id,
281        );
282        self.state
283            .send
284            .outbound_close_requested
285            .store(true, Ordering::Release);
286        self.state.send.outbound_waker.wake();
287        self.connection.outbound_waker().wake();
288        Poll::Ready(Ok(()))
289    }
290}
291
292/// Shared per-stream state. Owned by an [`Arc`] held jointly by the driver (via the connection's
293/// stream table) and the handler task (via [`H2Transport`]).
294#[derive(Debug, Default)]
295pub(super) struct StreamState {
296    /// Recv side: inbound DATA payloads, EOF flag, handler waker, handler-intent signal.
297    pub(super) recv: RecvState,
298    /// Send side: handoff slot from the conn task's `submit_send`, plus completion signaling
299    /// the conn task awaits.
300    pub(super) send: SendState,
301    /// Stream-error request raised from the conn-task side. Populated by
302    /// [`H2Connection::stream_error`][super::H2Connection::stream_error] when something on
303    /// the conn-task side (a body-read that detects content-length mismatch, a handler
304    /// that wants to abort) needs the driver to emit `RST_STREAM` and clean up. The driver
305    /// picks this up in `service_handler_signals` on its next tick and routes through
306    /// [`H2Driver::complete_and_remove_stream`][super::H2Driver]'s normal cleanup path.
307    ///
308    /// [`H2Driver`]: super::H2Driver
309    pub(super) pending_reset: Mutex<Option<H2ErrorCode>>,
310
311    /// Client-role: the application has dropped its [`H2Transport`] handle on a stream that
312    /// already wire-closed cleanly (both halves observed `END_STREAM`). The driver removes
313    /// the stream from both maps on its next `service_handler_signals` tick. No `RST_STREAM`
314    /// — the wire-side is already closed; this is purely application-side resource cleanup.
315    /// Distinct from [`Self::pending_reset`], which emits `RST_STREAM` for unclean teardown.
316    ///
317    /// Server role never sets this — server streams are removed eagerly when the response
318    /// finishes sending (no held-after-close lifecycle).
319    pub(super) pending_release: AtomicBool,
320
321    /// Mailbox flag for conn-task → driver work signaling.
322    ///
323    /// Set to `true` by conn-task code whenever it produces work the driver should service
324    /// (new submission, [`Self::pending_reset`], [`Self::pending_release`], a
325    /// [`RecvState::bytes_consumed`] increment, or a [`RecvState::is_reading`] transition).
326    /// The driver's `service_handler_signals` walks every stream and consults this flag via
327    /// `swap(false, AcqRel)` — only streams where it returns `true` pay for the per-field
328    /// pickup (mutex acquires for `submission` / `pending_reset`, etc.). Idle streams cost a
329    /// single atomic RMW per tick.
330    ///
331    /// **Setter ordering rule**: write the underlying state first, then store `true` with
332    /// `Release`, then call [`H2Connection::outbound_waker`][super::H2Connection]`.wake()`.
333    /// The `Release` store + driver's `AcqRel` swap form the synchronization edge that
334    /// publishes the underlying state to the driver. Over-notification (driver clears, finds
335    /// nothing, moves on) is harmless; under-notification would lose a signal — which is why
336    /// the underlying state must be written *before* the flag store.
337    pub(super) needs_servicing: AtomicBool,
338}
339
340/// Receive-side per-stream state.
341#[derive(Debug, Default)]
342pub(super) struct RecvState {
343    /// Inbound DATA body bytes awaiting handler read. A single persistent ring (append-at-tail,
344    /// `ignore_front`-at-head): the driver appends via `extend_from_slice` when a DATA frame
345    /// arrives; the handler reads from the front and virtually drops consumed bytes. When
346    /// `ignore_front` catches up to the data end the `Buffer` truncates to zero, so the
347    /// underlying `Vec` capacity stays bounded by peak in-flight bytes rather than cumulative
348    /// traffic — zero amortized allocations per DATA frame.
349    pub(super) buf: Mutex<Buffer>,
350
351    /// `true` once `END_STREAM` has been observed for this stream's recv side. Set by the
352    /// driver under the same `buf` lock used for pushes; checked by `poll_read` while
353    /// holding that lock to decide between EOF and Pending.
354    pub(super) eof: AtomicBool,
355
356    /// Handler-task waker, fired by the driver after pushing DATA into `buf` or after
357    /// setting `eof`. Single-waiter: only one task ever polls a given `H2Transport`.
358    pub(super) waker: AtomicWaker,
359
360    /// Set by the handler's first [`H2Transport::poll_read`] to declare intent to consume the
361    /// request body. The driver observes the transition and emits a `WINDOW_UPDATE` for this
362    /// stream, topping its recv window up from `SETTINGS_INITIAL_WINDOW_SIZE` (advertised as
363    /// `0`) to the per-stream maximum. Once set, stays set.
364    pub(super) is_reading: AtomicBool,
365
366    /// Bytes the handler has consumed from `buf` since the driver last sampled this counter.
367    /// Incremented by [`H2Transport::poll_read`] using `fetch_add` after each drain; the
368    /// driver reads it via `swap(0)` on each tick and emits stream-level + connection-level
369    /// `WINDOW_UPDATE` for the consumed total. Ensures a handler draining a body larger than
370    /// a single window doesn't stall the peer.
371    pub(super) bytes_consumed: AtomicU64,
372
373    /// Trailers, populated by the driver if a trailing HEADERS frame arrives for this stream.
374    /// Always written *before* `eof` is set, so once the handler observes `Ready(0)` on the
375    /// recv side, any trailers for this request are guaranteed to be in place.
376    ///
377    /// Taken out and moved into [`Conn::request_trailers`][crate::Conn] by the receiver-side
378    /// body state machine when it transitions to
379    /// [`ReceivedBodyState::End`][crate::received_body::ReceivedBodyState].
380    pub(super) trailers: Mutex<Option<Headers>>,
381
382    /// Client-role: response HEADERS field section, populated by the driver on the first
383    /// HEADERS frame arrival for a client-initiated stream. Server role doesn't use this slot
384    /// (response HEADERS go *out* on the server, not in). Single-shot: the conn task takes
385    /// the `FieldSection` via [`H2Connection::response_headers`][super::H2Connection] once;
386    /// subsequent HEADERS arrivals on the same stream are interpreted as trailers and routed
387    /// to the [`Self::trailers`] slot. 1xx interim responses are not modeled — the slot is
388    /// one `FieldSection` per stream, matching the same constraint elsewhere in trillium.
389    pub(super) response_headers: Mutex<Option<FieldSection<'static>>>,
390
391    /// Client-role: latching flag for "first HEADERS arrived for this stream." Distinct from
392    /// `response_headers.is_some()` — the conn task drains that slot when it consumes
393    /// headers, so the driver can't use slot occupancy to distinguish "haven't seen
394    /// HEADERS yet" from "headers seen + already taken." Set inside `finalize_response_headers`
395    /// before that slot is populated; checked by `route_headers` on subsequent HEADERS to
396    /// route them as trailers. Never cleared.
397    pub(super) first_response_headers_seen: AtomicBool,
398
399    /// Client-role: waker the conn task registers via
400    /// [`H2Connection::response_headers`][super::H2Connection]; fired by the driver after
401    /// stashing the `FieldSection` in [`Self::response_headers`] *or* on stream removal (so
402    /// a parked conn task observing the stream gone surfaces `NotConnected` instead of
403    /// hanging).
404    pub(super) response_headers_waker: AtomicWaker,
405}
406
407/// Send-side per-stream state used to hand a response from the conn task to the driver,
408/// plus the outbound byte queue for extended-CONNECT upgraded streams.
409///
410/// **Normal response path**: the conn task fills `submission` once via
411/// [`H2Connection::submit_send`][submit] and waits on `completion_waker` for `completed` to
412/// flip. The driver picks up the submission on its next `drive` tick, frames it (HEADERS,
413/// DATA, optional trailing HEADERS) into the connection's outbound buffer as send-side flow
414/// control allows, and on completion stores the `completion_result`, sets `completed = true`,
415/// and wakes the conn task.
416///
417/// **Extended-CONNECT upgrade path** ([RFC 8441]): the conn task calls
418/// [`H2Connection::submit_upgrade`][submit_upgrade], which constructs an
419/// [`H2OutboundReader`] over `outbound` / `outbound_close_requested` /
420/// `outbound_waker` and submits it as the response body. The driver signals
421/// `completion_waker` as soon as the response HEADERS frame is on the wire (instead of
422/// waiting for the body to drain), so the conn task's `submit_upgrade().await` returns and
423/// the runtime adapter can dispatch [`Handler::upgrade`][trillium::Handler::upgrade]. The
424/// upgrade handler then writes through [`H2Transport`]'s `AsyncWrite`, which appends to
425/// `outbound`; the driver's send pump pulls those bytes out via the body's `AsyncRead`
426/// and frames them as DATA. Closing the transport sets `outbound_close_requested`, the
427/// reader returns `Ready(0)`, and the send pump terminates the stream with
428/// `DATA(END_STREAM)`.
429///
430/// [submit]: super::H2Connection::submit_send
431/// [submit_upgrade]: super::H2Connection::submit_upgrade
432/// [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
433#[derive(Debug, Default)]
434pub(super) struct SendState {
435    /// Slot for the conn task's submission. Some between `submit_send` and the driver's
436    /// pickup tick; None at all other times.
437    pub(super) submission: Mutex<Option<Submission>>,
438
439    /// Set to `true` by the driver once the response has been fully framed, flushed, or
440    /// errored. The conn task's `SubmitSend` future polls this atomic and registers on
441    /// `completion_waker`.
442    pub(super) completed: AtomicBool,
443
444    /// The driver writes the final result here before flipping `completed`. The conn task
445    /// takes it once `completed` is observed true.
446    pub(super) completion_result: Mutex<Option<io::Result<()>>>,
447
448    /// The conn task's waker, registered by `SubmitSend::poll` and fired by the driver
449    /// after `completed` is set.
450    pub(super) completion_waker: AtomicWaker,
451
452    /// Outbound bytes for an extended-CONNECT (RFC 8441) upgraded stream.
453    /// Appended to by [`H2Transport`]'s `AsyncWrite::poll_write` and drained by the
454    /// upgrade body's `AsyncRead::poll_read` (the driver-task side of the send pump).
455    /// Empty for normal responses — the driver pumps the response [`Body`] directly.
456    pub(super) outbound: Mutex<Buffer>,
457
458    /// Set by [`H2Transport::poll_close`] to mark the upgrade-side write half closed.
459    /// The upgrade body's `poll_read` returns `Ready(0)` once `outbound` is empty and
460    /// this flag is set, which transitions the driver's send pump into the
461    /// trailers/`DATA(END_STREAM)` phase.
462    pub(super) outbound_close_requested: AtomicBool,
463
464    /// Waker for the upgrade body's `poll_read`. Fired by [`H2Transport::poll_write`]
465    /// after appending bytes and by [`H2Transport::poll_close`] after flipping
466    /// `outbound_close_requested`. Registered by the body during its `poll_read` when
467    /// it observes an empty buffer and no close flag.
468    pub(super) outbound_waker: AtomicWaker,
469
470    /// Reverse-direction waker: registered by [`H2Transport::poll_write`] when `outbound`
471    /// has reached the configured cap, fired by [`H2OutboundReader::poll_read`] after it
472    /// drains bytes (i.e. after `ignore_front`) so a parked writer can resume. This is the
473    /// edge that surfaces peer flow-control backpressure to the upgrade handler — without
474    /// it, a slow or unresponsive peer's closed window would let `outbound` grow without
475    /// bound.
476    pub(super) outbound_write_waker: AtomicWaker,
477}
478
479/// `AsyncRead` source the driver uses as the response body for an extended-CONNECT upgrade.
480///
481/// Reads from [`SendState::outbound`] — the same per-stream queue [`H2Transport`]'s
482/// `AsyncWrite::poll_write` appends to. Returns `Ready(0)` once the queue is empty and
483/// [`SendState::outbound_close_requested`] has been set (handler dropped or called
484/// `poll_close` on the transport), at which point the driver's send pump transitions
485/// through trailers (none) into `DATA(END_STREAM)` and tears the stream down.
486///
487/// Constructed by [`H2Connection::submit_upgrade`][super::H2Connection::submit_upgrade];
488/// wrapped in [`Body::new_streaming`] so the existing send pump can pump it as if it were
489/// any other unknown-length response body.
490#[derive(Debug)]
491pub(super) struct H2OutboundReader {
492    state: Arc<StreamState>,
493    stream_id: u32,
494}
495
496impl H2OutboundReader {
497    pub(super) fn new(state: Arc<StreamState>, stream_id: u32) -> Self {
498        Self { state, stream_id }
499    }
500}
501
502impl AsyncRead for H2OutboundReader {
503    fn poll_read(
504        self: Pin<&mut Self>,
505        cx: &mut Context<'_>,
506        out: &mut [u8],
507    ) -> Poll<io::Result<usize>> {
508        if out.is_empty() {
509            return Poll::Ready(Ok(0));
510        }
511
512        let send = &self.state.send;
513        let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
514        let take = out.len().min(outbound.len());
515        if take > 0 {
516            out[..take].copy_from_slice(&outbound[..take]);
517            outbound.ignore_front(take);
518            log::trace!(
519                "h2 stream {}: H2OutboundReader::poll_read drained {take} bytes",
520                self.stream_id,
521            );
522            // Drop the lock before waking — the writer reacquires it on resume.
523            drop(outbound);
524            // Surface flow-control backpressure: wake any writer parked on
525            // `outbound_write_waker` because the cap was hit. Registered-but-still-full
526            // is harmless — the writer's recheck under lock observes the new len.
527            send.outbound_write_waker.wake();
528            return Poll::Ready(Ok(take));
529        }
530
531        // Queue empty. Register first, then re-check the close flag. This closes the
532        // register-then-check race against `poll_close` (which doesn't take the buf
533        // lock — it just stores the flag and fires the waker). Holding the buf lock
534        // means `poll_write` can't race here; only `poll_close` can.
535        send.outbound_waker.register(cx.waker());
536
537        if send.outbound_close_requested.load(Ordering::Acquire) {
538            log::trace!(
539                "h2 stream {}: H2OutboundReader::poll_read EOF (close_requested + empty)",
540                self.stream_id,
541            );
542            return Poll::Ready(Ok(0));
543        }
544        Poll::Pending
545    }
546}
547
548/// What the conn task hands the driver to begin a send on a stream.
549///
550/// `body` carries either a normal response body or, for extended-CONNECT (RFC 8441)
551/// upgrades, a streaming body that reads from [`SendState::outbound`] (which the upgrade
552/// handler's [`H2Transport`] `AsyncWrite` writes into). Trailers (if any) come from
553/// [`Body::trailers`] after drain — not a separate field.
554///
555/// `is_upgrade` flips the driver's completion semantics: instead of signaling
556/// [`SubmitSend`][super::SubmitSend] completion after the body is fully on the wire, the
557/// driver signals completion as soon as the response HEADERS frame is flushed. That lets
558/// [`Conn::send_h2`][crate::Conn::send_h2] return so the runtime can dispatch
559/// [`Handler::upgrade`][trillium::Handler::upgrade], while the body keeps streaming in the
560/// background.
561#[derive(Debug)]
562pub(super) struct Submission {
563    /// Owned pseudo-headers for the response/request. Combined with `headers` on the driver
564    /// task to form a [`FieldSection`] which is then HPACK-encoded synchronously via
565    /// [`HpackEncoder`][crate::headers::hpack::HpackEncoder] at submission pickup. The
566    /// encoder runs only on the driver task: each pickup-tick encodes its submissions
567    /// against the live dynamic-table state, then frames HEADERS in the order they were
568    /// encoded — matching the wire-emission order that HPACK's stateful decoder requires.
569    pub(super) pseudos: PseudoHeaders<'static>,
570    /// Owned headers for the block. Cloned from the conn task's `request_headers` /
571    /// `response_headers` so those remain readable to caller and middleware after the send.
572    pub(super) headers: Headers,
573    pub(super) body: Option<Body>,
574    pub(super) is_upgrade: bool,
575}
576
577impl Submission {
578    /// Borrow this submission's headers as a [`FieldSection`] for encoding.
579    pub(super) fn field_section(&self) -> FieldSection<'_> {
580        FieldSection::new(self.pseudos.clone(), &self.headers)
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use crate::HttpContext;
588    use futures_lite::{AsyncRead, AsyncWrite};
589    use std::{
590        sync::{
591            Arc,
592            atomic::{AtomicBool, Ordering},
593        },
594        task::{Context, Poll, Wake, Waker},
595    };
596
597    struct CountingWaker(AtomicBool);
598    impl Wake for CountingWaker {
599        fn wake(self: Arc<Self>) {
600            self.0.store(true, Ordering::Release);
601        }
602    }
603
604    fn pair_with_cap(cap: usize) -> (H2Transport, H2OutboundReader) {
605        let mut context = HttpContext::new();
606        context.config.response_buffer_max_len = cap;
607        let connection = H2Connection::new(Arc::new(context));
608        let state = Arc::new(StreamState::default());
609        let transport = H2Transport::new(connection.clone(), 1, state.clone());
610        let reader = H2OutboundReader::new(state, 1);
611        (transport, reader)
612    }
613
614    #[test]
615    fn poll_write_caps_at_response_buffer_max_len() {
616        // Cap of 16 bytes. Writing 32 bytes should accept exactly 16 (partial-write
617        // semantics; AsyncWriteExt::write_all retries the rest).
618        let (mut transport, _reader) = pair_with_cap(16);
619        let waker = Waker::from(Arc::new(CountingWaker(AtomicBool::new(false))));
620        let mut cx = Context::from_waker(&waker);
621
622        let buf = [0u8; 32];
623        match Pin::new(&mut transport).poll_write(&mut cx, &buf) {
624            Poll::Ready(Ok(n)) => assert_eq!(n, 16, "should accept exactly cap bytes"),
625            other => panic!("expected Ready(Ok(16)), got {other:?}"),
626        }
627    }
628
629    #[test]
630    fn poll_write_returns_pending_when_full_and_drain_wakes() {
631        let (mut transport, mut reader) = pair_with_cap(8);
632        let counting = Arc::new(CountingWaker(AtomicBool::new(false)));
633        let writer_waker = Waker::from(counting.clone());
634        let mut writer_cx = Context::from_waker(&writer_waker);
635
636        // Fill the buffer to the cap.
637        let buf = [0u8; 8];
638        match Pin::new(&mut transport).poll_write(&mut writer_cx, &buf) {
639            Poll::Ready(Ok(8)) => {}
640            other => panic!("expected Ready(Ok(8)), got {other:?}"),
641        }
642
643        // Next write must return Pending — buffer is at cap.
644        let extra = [0u8; 4];
645        match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
646            Poll::Pending => {}
647            other => panic!("expected Pending, got {other:?}"),
648        }
649        assert!(
650            !counting.0.load(Ordering::Acquire),
651            "writer waker should not have fired yet"
652        );
653
654        // Drain via the reader — this should wake the writer.
655        let reader_waker = Waker::noop().clone();
656        let mut reader_cx = Context::from_waker(&reader_waker);
657        let mut sink = [0u8; 4];
658        match Pin::new(&mut reader).poll_read(&mut reader_cx, &mut sink) {
659            Poll::Ready(Ok(4)) => {}
660            other => panic!("expected Ready(Ok(4)), got {other:?}"),
661        }
662        assert!(
663            counting.0.load(Ordering::Acquire),
664            "drain should have woken the writer"
665        );
666
667        // Re-poll the writer — there's now room for the 4 extra bytes.
668        match Pin::new(&mut transport).poll_write(&mut writer_cx, &extra) {
669            Poll::Ready(Ok(4)) => {}
670            other => panic!("expected Ready(Ok(4)), got {other:?}"),
671        }
672    }
673}