trillium-server-common 0.7.4

server utilities for trillium.rs
Documentation
//! HTTP/2 specific dispatch for runtime adapters.
//!
//! Entry point used by [`running_config`][crate::running_config]: when the TLS acceptor
//! signals `h2` via ALPN, or when a connection (cleartext or TLS-without-ALPN-h2) presents
//! the HTTP/2 client preface, the adapter hands the transport to [`run_h2`]. This module
//! owns the per-connection driver loop and per-stream task spawning that mirrors
//! [`h3::run_h3`][crate::h3::run_h3].

use crate::{Acceptor, ArcHandler, Runtime, Server, running_config::RunningConfig};
use futures_lite::io::{AsyncRead, AsyncWrite};
use std::{
    io,
    net::{IpAddr, SocketAddr},
    pin::Pin,
    task::{Context, Poll},
};
use trillium::{Conn, Handler, KnownHeaderName, Transport, Upgrade};
use trillium_http::h2::{H2Connection, H2Transport};

/// HTTP/2 client connection preface (RFC 9113 §3.4). The first 24 bytes every HTTP/2 client
/// sends before any frames; the prior-knowledge dispatch path peeks the (cleartext or
/// post-TLS) stream for these bytes to decide between HTTP/1.1 and HTTP/2.
pub(crate) const CLIENT_PREFACE: &[u8; 24] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";

impl<S: Server, A: Acceptor<S::Transport>> RunningConfig<S, A> {
    /// Drive an HTTP/2 connection end-to-end: construct the [`H2Connection`], run its driver
    /// loop, and spawn a per-stream task running the user handler for every emitted [`Conn`].
    ///
    /// `peer_ip`, the listener provenance, and the transport's security are populated onto each
    /// per-stream [`Conn`] before the handler runs, matching [`crate::running_config`]'s HTTP/1.1
    /// path. The listener, when present, is inserted into the conn's state — both the
    /// [`Listener`](trillium::Listener) itself and, for an addressed listener, its [`SocketAddr`]
    /// as the ingress address. The
    /// configured `alt-svc`, when present, is set as the response header on every emitted stream.
    /// Security reflects the underlying transport: cleartext h2c is `false`; ALPN-negotiated h2 and
    /// TLS-prior-knowledge h2 both reflect whatever the TLS acceptor reports.
    pub(crate) async fn run_h2<T>(
        &self,
        transport: T,
        handler: ArcHandler<impl Handler>,
        peer_ip: Option<IpAddr>,
    ) where
        T: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
    {
        let runtime: Runtime = self.runtime.clone().into();
        let listener = self.listener.clone();
        let local_alt_svc = self.local_alt_svc;
        let is_secure = self.acceptor.is_secure();

        let h2 = H2Connection::new(self.context.clone());
        let mut driver = h2.clone().run(transport);

        while let Some(result) = driver.next().await {
            match result {
                Ok(conn) => {
                    let stream_id = conn.h2_stream_id();
                    log::trace!("run_h2: spawning handler task for stream {stream_id:?}");
                    let handler = handler.clone();
                    let listener = listener.clone();
                    runtime.spawn(async move {
                        let inner_handler = handler.clone();
                        let result = H2Connection::process_inbound(conn, |mut conn| async move {
                            let handler = &inner_handler;
                            conn.set_peer_ip(peer_ip);
                            conn.set_secure(is_secure);
                            let mut conn = Conn::from(conn);
                            if let Some(listener) = listener {
                                if let Some(addr) = listener.socket_addr() {
                                    conn.insert_state(addr);
                                }
                                conn.insert_state(listener);
                            }
                            if let Some(alt_svc) = local_alt_svc {
                                conn.response_headers_mut()
                                    .try_insert(KnownHeaderName::AltSvc, alt_svc);
                            }
                            let conn = handler.run(conn).await;
                            let conn = handler.before_send(conn).await;
                            conn.into_inner::<H2Transport>()
                        })
                        .await;

                        match result {
                            Ok(conn) if conn.should_upgrade() => {
                                let upgrade = Upgrade::from(conn);
                                if handler.has_upgrade(&upgrade) {
                                    log::debug!("upgrading h2 stream");
                                    handler.upgrade(upgrade).await;
                                } else {
                                    log::error!(
                                        "h2 upgrade specified but no upgrade handler provided"
                                    );
                                }
                            }
                            Ok(_) => {}
                            Err(e) => {
                                log::debug!("h2 stream error: {e}");
                            }
                        }
                    });
                }
                Err(e) => {
                    log::debug!("h2 connection error: {e}");
                    break;
                }
            }
        }
        log::trace!("run_h2: driver exhausted, connection done");
    }
}

/// A TCP transport that first serves a pre-peeked byte prefix before reading from its wrapped
/// inner transport.
///
/// Used by the HTTP/2 prior-knowledge path in [`crate::running_config`] (both cleartext
/// h2c and TLS-without-ALPN-h2): the adapter peeks the first 24 bytes of the post-acceptor
/// transport to compare against [`CLIENT_PREFACE`]; if they match, we hand the original
/// transport *with those bytes prepended* to [`run_h2`] so the driver's preface-reading
/// state can consume them without re-reading from the wire.
///
/// Forwards [`Transport`] trait methods to the wrapped inner transport so socket options and
/// peer-addr queries behave the same as they would without the wrapper.
#[derive(Debug)]
pub(crate) struct Prefixed<T> {
    prefix: Vec<u8>,
    offset: usize,
    inner: T,
}

impl<T> Prefixed<T> {
    pub(crate) fn new(prefix: Vec<u8>, inner: T) -> Self {
        Self {
            prefix,
            offset: 0,
            inner,
        }
    }
}

impl<T: AsyncRead + Unpin> AsyncRead for Prefixed<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let this = self.get_mut();
        if this.offset < this.prefix.len() {
            let take = (this.prefix.len() - this.offset).min(buf.len());
            buf[..take].copy_from_slice(&this.prefix[this.offset..this.offset + take]);
            this.offset += take;
            if this.offset >= this.prefix.len() {
                // Drop the peeked bytes once replayed — no further references to them, and a
                // long-lived connection shouldn't keep the allocation around.
                this.prefix = Vec::new();
            }
            return Poll::Ready(Ok(take));
        }
        Pin::new(&mut this.inner).poll_read(cx, buf)
    }
}

impl<T: AsyncWrite + Unpin> AsyncWrite for Prefixed<T> {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_flush(cx)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_close(cx)
    }
}

impl<T: Transport> Transport for Prefixed<T> {
    fn set_linger(&mut self, linger: Option<std::time::Duration>) -> io::Result<()> {
        self.inner.set_linger(linger)
    }

    fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
        self.inner.set_nodelay(nodelay)
    }

    fn set_ip_ttl(&mut self, ttl: u32) -> io::Result<()> {
        self.inner.set_ip_ttl(ttl)
    }

    fn peer_addr(&self) -> io::Result<Option<SocketAddr>> {
        self.inner.peer_addr()
    }

    fn negotiated_alpn(&self) -> Option<std::borrow::Cow<'_, [u8]>> {
        self.inner.negotiated_alpn()
    }
}