trillium-http 1.3.1

the http implementation for the trillium toolkit
Documentation
//! Conn-task → driver submission API.
//!
//! Server-side: [`H2Connection::submit_send`][super::H2Connection::submit_send] /
//! [`submit_upgrade`][super::H2Connection::submit_upgrade] hand a fully-encoded response off
//! to the driver for framing. Client-side: [`open_stream`][super::H2Connection::open_stream] /
//! [`open_connect_stream`][super::H2Connection::open_connect_stream] allocate a fresh
//! peer-initiated stream id and stage a request submission.
//!
//! All four entry points share the same shape: lock the streams map, transition the
//! per-stream lifecycle to [`StreamLifecycle::Submitted`], raise `needs_servicing`, wake
//! the driver. The returned [`SubmitSend`] future resolves once the driver signals send
//! completion (early at `END_HEADERS` for upgrades; after `END_STREAM` otherwise).

use super::H2Connection;
#[cfg(feature = "unstable")]
use crate::h2::transport::H2Transport;
use crate::{
    Body, Headers,
    h2::{
        lifecycle::{StreamLifecycle, Submission},
        transport::StreamState,
    },
    headers::hpack::PseudoHeaders,
};
use std::{
    future::Future,
    io,
    pin::Pin,
    sync::{Arc, atomic::Ordering},
    task::{Context, Poll},
};

/// Future returned by the various send-staging primitives on [`H2Connection`]; resolves once
/// the driver has fully framed and flushed the submitted message (request on the client,
/// response on the server), or with the relevant `io::Error` on failure.
#[must_use = "futures do nothing unless awaited"]
#[derive(Debug)]
pub struct SubmitSend {
    pub(super) stream_id: u32,
    /// `None` if the stream wasn't in the map at submit time (already closed). The future
    /// surfaces that as `NotConnected`.
    pub(super) stream: Option<Arc<StreamState>>,
}

impl Future for SubmitSend {
    type Output = io::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let Some(state) = &self.stream else {
            log::debug!("h2 stream {}: submit_send on closed stream", self.stream_id);
            return Poll::Ready(Err(io::ErrorKind::NotConnected.into()));
        };

        let stream_id = self.stream_id;
        let try_take = || -> Option<io::Result<()>> {
            state.send.completed.load(Ordering::Acquire).then(|| {
                state
                    .send
                    .completion_result
                    .lock()
                    .expect("completion_result mutex poisoned")
                    .take()
                    .unwrap_or_else(|| {
                        log::error!(
                            "h2 stream {stream_id}: completed without a completion_result — \
                             driver should write the result before flipping completed"
                        );
                        Ok(())
                    })
            })
        };

        if let Some(result) = try_take() {
            return Poll::Ready(result);
        }
        state.send.completion_waker.register(cx.waker());
        // Re-check after registering so we don't miss a wake fired between the load above
        // and the registration.
        if let Some(result) = try_take() {
            return Poll::Ready(result);
        }
        Poll::Pending
    }
}

impl H2Connection {
    /// Hand a response off to the driver for framing and transmission.
    ///
    /// The conn task stages owned `pseudos + headers + body` in the per-stream submission
    /// slot and `await`s the returned future. The driver HPACK-encodes the headers, frames
    /// HEADERS + DATA, and signals completion.
    ///
    /// Trailers are not a separate argument: the driver pulls them off the body via
    /// [`Body::trailers`] once the body is fully drained.
    pub(crate) fn submit_send(
        &self,
        stream_id: u32,
        pseudos: PseudoHeaders<'static>,
        headers: Headers,
        body: Option<Body>,
    ) -> SubmitSend {
        let stream = self.streams_lock().get(&stream_id).cloned();
        if let Some(state) = &stream {
            stage_submission(
                state,
                Submission {
                    pseudos,
                    headers,
                    body,
                    is_upgrade: false,
                },
            );
            self.outbound_waker.wake();
        }
        SubmitSend { stream_id, stream }
    }

    /// Hand a response off for an extended-CONNECT (RFC 8441) upgrade.
    ///
    /// Frames the response HEADERS without `END_STREAM` and signals [`SubmitSend`]
    /// completion the moment the HEADERS frame is on the wire — earlier than
    /// [`submit_send`][Self::submit_send], which waits for the body to finish. The early
    /// completion lets the upgrade handler take over while the stream stays open as a
    /// bidirectional byte channel.
    ///
    /// The stream's body is sourced from the per-stream outbound queue; the upgrade handler
    /// writes bytes through the returned transport and the send pump frames them as DATA
    /// bounded by per-stream + connection send windows. Closing or dropping the transport
    /// emits `DATA(END_STREAM)` and tears the stream down.
    pub(crate) fn submit_upgrade(
        &self,
        stream_id: u32,
        pseudos: PseudoHeaders<'static>,
        headers: Headers,
    ) -> SubmitSend {
        let stream = self.streams_lock().get(&stream_id).cloned();
        if let Some(state) = &stream {
            let reader = crate::h2::transport::H2OutboundReader::new(state.clone(), stream_id);
            let body = Body::new_streaming(reader, None);
            stage_submission(
                state,
                Submission {
                    pseudos,
                    headers,
                    body: Some(body),
                    is_upgrade: true,
                },
            );
            log::trace!("h2 stream {stream_id}: submit_upgrade — submission staged");
            self.outbound_waker.wake();
        }
        SubmitSend { stream_id, stream }
    }

    /// Stage trailing HEADERS for an active upgrade stream and close the outbound write
    /// half. Fire-and-forget — the driver task emits the trailing HEADERS frame with
    /// `END_STREAM` and tears the stream down.
    ///
    /// # Errors
    ///
    /// Returns [`io::ErrorKind::NotConnected`] if the stream is no longer in the
    /// connection's streams map.
    pub(crate) fn submit_trailers(&self, stream_id: u32, trailers: Headers) -> io::Result<()> {
        let stream = self
            .streams_lock()
            .get(&stream_id)
            .cloned()
            .ok_or(io::ErrorKind::NotConnected)?;
        // Lifecycle transition `UpgradeOpen → UpgradeClosing(trailers)`. Stages the
        // trailers as the variant payload so the driver's send pump picks them up at the
        // Body → Trailers transition. No-op on any non-`UpgradeOpen` state (already
        // closing / already reset / not an upgrade) — `Ok(())` is returned in those cases
        // since the trailers can't physically land on the wire but the caller's intent is
        // best-effort.
        let mut lifecycle = stream.lifecycle_lock();
        match &*lifecycle {
            StreamLifecycle::UpgradeOpen { recv_eof } => {
                *lifecycle = StreamLifecycle::UpgradeClosing {
                    recv_eof: *recv_eof,
                    pending_trailers: Some(trailers),
                };
            }
            StreamLifecycle::UpgradeClosing { recv_eof, .. } => {
                // Already closing — overwrite the pending trailers slot with the newer
                // set. (Realistically `submit_trailers` is called once, but a second call
                // shouldn't silently drop the input.)
                *lifecycle = StreamLifecycle::UpgradeClosing {
                    recv_eof: *recv_eof,
                    pending_trailers: Some(trailers),
                };
            }
            _ => {
                log::trace!(
                    "h2 stream {stream_id}: submit_trailers on lifecycle {:?} — dropped",
                    *lifecycle,
                );
                return Ok(());
            }
        }
        drop(lifecycle);
        stream.needs_servicing.store(true, Ordering::Release);
        stream.send.outbound_waker.wake();
        self.outbound_waker.wake();
        log::trace!("h2 stream {stream_id}: submit_trailers staged trailers + close");
        Ok(())
    }

    /// Client-role primitive: allocate a fresh outbound stream id, stage a request submission
    /// for the driver, and return the id, a [`SubmitSend`] tracking the request's send half,
    /// and the per-stream [`H2Transport`] for response-body reads.
    ///
    /// `pseudos + headers` are handed owned to the driver, which encodes them synchronously
    /// at submission pickup. `body` is the request body, if any; `None` causes the HEADERS
    /// frame to carry `END_STREAM` and no DATA to be emitted.
    ///
    /// Returns `None` when:
    /// - The 2^31 odd-id space is exhausted (caller should fail over to a new connection), or
    /// - The connection is shutting down (GOAWAY received or local shutdown requested).
    ///
    /// The returned [`SubmitSend`] resolves once the request has been fully framed and
    /// flushed, or with the relevant `io::Error` on failure. The response side is awaited
    /// separately via [`response_headers`][Self::response_headers] for the response HEADERS,
    /// and the [`H2Transport`]'s `AsyncRead` impl for the response body.
    ///
    /// **`SubmitSend` is drop-safe.** Once handed off here, the body is owned by the driver,
    /// which continues to drain it, frame DATA, emit trailers / `END_STREAM`, and tear the
    /// stream down whether or not the caller awaits the returned future. Clients that only
    /// care about the response (the typical case) may drop it without polling.
    ///
    /// # Panics
    ///
    /// Panics if any per-connection or per-stream mutex is poisoned.
    #[cfg(feature = "unstable")]
    pub fn open_stream(
        self: &Arc<Self>,
        pseudos: PseudoHeaders<'static>,
        headers: Headers,
        body: Option<Body>,
    ) -> Option<(u32, SubmitSend, H2Transport)> {
        self.open_stream_inner(pseudos, headers, body, false)
            .map(|(id, state, transport)| {
                (
                    id,
                    SubmitSend {
                        stream_id: id,
                        stream: Some(state),
                    },
                    transport,
                )
            })
    }

    /// Client-role: open a stream for an extended-CONNECT bootstrap (RFC 8441 for
    /// WebSocket-over-h2; `draft-ietf-webtrans-http2` for WebTransport-over-h2).
    ///
    /// `headers_plan` is the HPACK plan for the HEADERS block; the caller is responsible
    /// for ensuring it carries `:method = CONNECT` and a `:protocol` pseudo-header. The
    /// initial HEADERS is sent without `END_STREAM`; the per-stream outbound queue becomes
    /// the request body and stays open until the application closes the returned transport.
    ///
    /// Returns `(stream_id, H2Transport)` — no [`SubmitSend`]. The application reads
    /// response HEADERS via [`Self::response_headers`] and then exchanges bytes over the
    /// returned transport's `AsyncRead` + `AsyncWrite`.
    ///
    /// Returns `None` under the same conditions as [`Self::open_stream`]: stream-id space
    /// exhausted, or connection shutting down.
    ///
    /// **Caller MUST first await [`peer_settings`][Self::peer_settings] and verify the
    /// returned snapshot's `enable_connect_protocol() == Some(true)` before calling this.**
    /// Sending extended CONNECT to a peer that hasn't advertised
    /// `SETTINGS_ENABLE_CONNECT_PROTOCOL = 1` is a protocol violation.
    #[cfg(feature = "unstable")]
    pub fn open_connect_stream(
        self: &Arc<Self>,
        pseudos: PseudoHeaders<'static>,
        headers: Headers,
    ) -> Option<(u32, H2Transport)> {
        let (id, _state, transport) = self.open_stream_inner(pseudos, headers, None, true)?;
        Some((id, transport))
    }

    /// Shared id-allocate-and-stage logic backing [`Self::open_stream`] and
    /// [`Self::open_connect_stream`]. The `is_upgrade` flag controls two things in the driver's
    /// send pump: HEADERS does not carry `END_STREAM` (because the body field is `Some`), and
    /// the body is sourced from the per-stream outbound queue ([`H2OutboundReader`]) rather
    /// than the caller-supplied `Body`. For the non-upgrade path, the caller-supplied `body`
    /// is used as-is and `END_STREAM` semantics fall out of `body.is_none()`.
    ///
    /// [`H2OutboundReader`]: crate::h2::transport::H2OutboundReader
    #[cfg(feature = "unstable")]
    fn open_stream_inner(
        self: &Arc<Self>,
        pseudos: PseudoHeaders<'static>,
        headers: Headers,
        body: Option<Body>,
        is_upgrade: bool,
    ) -> Option<(u32, Arc<StreamState>, H2Transport)> {
        if !self.swansong.state().is_running() {
            return None;
        }

        let stream_id = self
            .next_client_stream_id
            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
                (n < (1u32 << 31)).then_some(n + 2)
            })
            .ok()?;

        let state = Arc::new(StreamState::default());

        // Extended-CONNECT bootstrap: sourcing the body from the per-stream outbound
        // queue keeps HEADERS flowing without END_STREAM and lets the application's
        // writes through `H2Transport`'s `AsyncWrite` flow out as DATA frames.
        let body = if is_upgrade {
            let reader = crate::h2::transport::H2OutboundReader::new(state.clone(), stream_id);
            Some(Body::new_streaming(reader, None))
        } else {
            body
        };

        // Stage submission *before* publishing the stream id to the shared map so the
        // submission is guaranteed visible the first time the driver sees the stream —
        // otherwise a second tick would be needed to start framing.
        stage_submission(
            &state,
            Submission {
                pseudos,
                headers,
                body,
                is_upgrade,
            },
        );
        self.streams_lock().insert(stream_id, state.clone());
        log::trace!("h2 client: open_stream allocated stream {stream_id} (upgrade={is_upgrade})");
        self.outbound_waker.wake();
        let transport = H2Transport::new(Arc::clone(self), stream_id, state.clone());
        Some((stream_id, state, transport))
    }
}

/// Transition a stream's lifecycle to [`StreamLifecycle::Submitted`] (preserving
/// `recv_eof` from the prior state) and raise the per-stream `needs_servicing` mailbox
/// flag. The caller is responsible for waking the connection-level driver waker.
///
/// No-op (warning logged) if the stream is in a terminal variant — the submission can't
/// land on a stream that's already reset or released, and the staged headers/body would
/// just leak.
fn stage_submission(state: &Arc<StreamState>, submission: Submission) {
    let mut lifecycle = state.lifecycle_lock();
    let recv_eof = lifecycle.recv_eof();
    match &*lifecycle {
        StreamLifecycle::Idle { .. } | StreamLifecycle::Submitted { .. } => {
            *lifecycle = StreamLifecycle::Submitted {
                submission: Box::new(submission),
                recv_eof,
            };
        }
        _ => {
            log::warn!(
                "h2 stage_submission on lifecycle {:?} — dropped",
                *lifecycle,
            );
            return;
        }
    }
    drop(lifecycle);
    state.needs_servicing.store(true, Ordering::Release);
}