trillium-http 1.0.0

the http implementation for the trillium toolkit
Documentation
use crate::{
    Buffer,
    h3::{
        H3Error,
        frame::{Frame, FrameDecodeError},
    },
};
use futures_lite::{AsyncRead, AsyncReadExt, io as async_io};
use std::io;

/// A borrowed view over an `AsyncRead` transport that yields H3 frames.
///
/// Unknown/GREASE frames are automatically skipped by [`next`](Self::next).
#[derive(Debug)]
pub struct FrameStream<'a, R> {
    reader: &'a mut R,
    buf: &'a mut Buffer,
    pending_skip: u64,
}

impl<'a, R: AsyncRead + Unpin> FrameStream<'a, R> {
    /// Construct a new `FrameStream`
    pub fn new(reader: &'a mut R, buf: &'a mut Buffer) -> Self {
        Self {
            reader,
            buf,
            pending_skip: 0,
        }
    }

    /// Yield the next meaningful frame, skipping unknown/GREASE frames.
    ///
    /// Any unconsumed payload from the previous frame is automatically
    /// drained before decoding the next frame header.
    ///
    /// Returns `Ok(None)` on clean stream end (FIN before any frame header).
    ///
    /// # Errors
    ///
    /// Returns an error when we are unable to decode a frame.
    pub async fn next(&mut self) -> Result<Option<ActiveFrame<'_, 'a, R>>, H3Error> {
        if self.pending_skip > 0 {
            let skip = self.pending_skip;
            self.pending_skip = 0;
            self.skip_bytes(skip).await?;
        }

        loop {
            match Frame::decode(self.buf) {
                Ok((Frame::Unknown(len), consumed)) => {
                    log::trace!("skipping unknown frame, payload length {len}");
                    self.skip_bytes(len + consumed as u64).await?;
                    continue;
                }
                Ok((frame, consumed)) => {
                    self.buf.ignore_front(consumed);
                    let remaining = match &frame {
                        Frame::Data(len) | Frame::Headers(len) => *len,
                        Frame::PushPromise {
                            field_section_length,
                            ..
                        } => *field_section_length,
                        _ => 0, // control frames are fully parsed by decode
                    };
                    return Ok(Some(ActiveFrame {
                        stream: self,
                        frame,
                        remaining,
                    }));
                }
                Err(FrameDecodeError::Incomplete) => {}
                Err(FrameDecodeError::Error(code)) => return Err(code.into()),
            }

            if !self.read_more().await? {
                return Ok(None);
            }
        }
    }

    /// Read more bytes from the transport into the buffer.
    /// Returns `false` on EOF.
    async fn read_more(&mut self) -> io::Result<bool> {
        let before = self.buf.len();
        self.buf.expand();
        let n = self.reader.read(&mut self.buf[before..]).await?;
        self.buf.truncate(before + n);
        Ok(n > 0)
    }

    /// Skip `n` bytes from the buffer, reading more from the transport if needed.
    async fn skip_bytes(&mut self, n: u64) -> io::Result<()> {
        let from_buf = usize::try_from(n)
            .unwrap_or(self.buf.len())
            .min(self.buf.len());
        self.buf.ignore_front(from_buf);
        let remaining = n - from_buf as u64;

        if remaining > 0 {
            let copied =
                async_io::copy((&mut self.reader).take(remaining), async_io::sink()).await?;
            if copied < remaining {
                return Err(io::Error::new(
                    io::ErrorKind::UnexpectedEof,
                    "stream ended mid-frame payload",
                ));
            }
        }

        Ok(())
    }
}

/// A single H3 frame whose payload has not yet been consumed.
///
/// While this exists, it holds a mutable borrow on the parent [`FrameStream`],
/// preventing further frame decoding until this frame's payload is dealt with.
///
/// On drop, any unconsumed payload is recorded on the parent `FrameStream`
/// and will be skipped at the start of the next [`FrameStream::next`] call.
#[derive(Debug)]
pub struct ActiveFrame<'b, 'a, R> {
    stream: &'b mut FrameStream<'a, R>,
    frame: Frame,
    remaining: u64,
}

impl<R: AsyncRead + Unpin> ActiveFrame<'_, '_, R> {
    /// The decoded frame header.
    pub fn frame(&self) -> &Frame {
        &self.frame
    }

    /// Buffer the complete payload into the stream's buffer and return it.
    ///
    /// The payload bytes remain in the buffer and are skipped when this
    /// `ActiveFrame` is dropped (or when [`FrameStream::next`] is called).
    ///
    /// # Errors
    ///
    /// Returns an error if the stream ends before the full payload is
    /// received or if the payload length exceeds addressable memory.
    pub async fn buffer_payload(&mut self) -> io::Result<&[u8]> {
        let len = usize::try_from(self.remaining).map_err(|_| {
            io::Error::new(
                io::ErrorKind::InvalidData,
                "frame payload too large to buffer",
            )
        })?;
        while self.stream.buf.len() < len {
            if !self.stream.read_more().await? {
                return Err(io::Error::new(
                    io::ErrorKind::UnexpectedEof,
                    "stream ended mid-frame payload",
                ));
            }
        }
        // remaining stays as-is — Drop will set pending_skip to skip these bytes
        Ok(&self.stream.buf[..len])
    }
}

impl<R> Drop for ActiveFrame<'_, '_, R> {
    fn drop(&mut self) {
        // Skip whatever portion of the payload is already in the buffer synchronously.
        let in_buf = usize::try_from(self.remaining)
            .unwrap_or(self.stream.buf.len())
            .min(self.stream.buf.len());
        self.stream.buf.ignore_front(in_buf);
        // Any remainder beyond what's buffered gets deferred to the next next() call.
        self.stream.pending_skip = self.remaining - in_buf as u64;
    }
}