trillium-http 1.2.0

the http implementation for the trillium toolkit
Documentation
//! Reads the peer's QPACK decoder stream (RFC 9204 §4.4).
//!
//! Runs as a connection-scoped task: parses a continuous stream of Section Acknowledgement,
//! Stream Cancellation, and Insert Count Increment instructions via
//! [`instruction::decoder::parse`] and applies them to the [`EncoderDynamicTable`]. Returns
//! on clean EOF, shutdown, I/O error, or protocol error; on protocol/I/O error the table is
//! marked failed so the encoder-stream writer exits too.

use crate::{
    h3::{H3Error, H3ErrorCode},
    headers::qpack::{
        EncoderDynamicTable,
        instruction::decoder::{DecoderInstruction, parse},
    },
};
use futures_lite::io::AsyncRead;

impl EncoderDynamicTable {
    /// Process the peer's decoder-stream instructions and apply them to `self`.
    ///
    /// Reads until EOF (clean stream close) or a protocol / I/O error. On any error, marks the
    /// table as failed so that the encoder-stream writer task also exits.
    ///
    /// # Errors
    ///
    /// Returns an `H3Error` on I/O or protocol failure.
    pub(crate) async fn run_reader<T>(&self, stream: &mut T) -> Result<(), H3Error>
    where
        T: AsyncRead + Unpin + Send,
    {
        log::trace!("QPACK decoder stream reader: started");
        // RFC 9204 §4.2: closure of the decoder stream is a connection error of type
        // H3_CLOSED_CRITICAL_STREAM. process_instructions returns Ok on clean EOF; here we
        // promote that to an error and mark the table failed. Graceful server-side
        // shutdown drops this future via swansong before reaching that arm.
        let result = match self.process_instructions(stream).await {
            Ok(()) => {
                log::debug!(
                    "QPACK decoder stream reader: peer closed (FIN) — H3_CLOSED_CRITICAL_STREAM"
                );
                Err(H3ErrorCode::ClosedCriticalStream.into())
            }
            Err(e) => Err(e),
        };
        match &result {
            Err(H3Error::Protocol(code)) => {
                log::debug!("QPACK decoder stream reader: protocol error: {code}");
                self.fail(*code);
            }
            Err(H3Error::Io(e)) => {
                log::debug!("QPACK decoder stream reader: I/O error: {e}");
                self.fail(H3ErrorCode::QpackDecoderStreamError);
            }
            // unreachable given the EOF promotion above; defensively a no-op.
            Ok(()) => {}
        }
        result
    }

    /// Loop-body of [`run_reader`] separated for tests and corpus replay: parse and apply
    /// peer instructions until clean EOF or error, but **do not** convert EOF into
    /// `H3_CLOSED_CRITICAL_STREAM` and **do not** mark the table failed. Production wiring
    /// goes through [`run_reader`], which does both per RFC 9204 §4.2.
    pub(crate) async fn process_instructions<T>(&self, stream: &mut T) -> Result<(), H3Error>
    where
        T: AsyncRead + Unpin + Send,
    {
        while let Some(instruction) = parse(stream).await? {
            apply(self, instruction)?;
        }
        Ok(())
    }
}

fn apply(table: &EncoderDynamicTable, instruction: DecoderInstruction) -> Result<(), H3Error> {
    match instruction {
        DecoderInstruction::SectionAcknowledgement { stream_id } => {
            log::trace!(
                "QPACK decoder stream reader: Section Acknowledgement stream_id={stream_id}"
            );
            table.on_section_ack(stream_id)
        }
        DecoderInstruction::StreamCancellation { stream_id } => {
            log::trace!("QPACK decoder stream reader: Stream Cancellation stream_id={stream_id}");
            table.on_stream_cancel(stream_id);
            Ok(())
        }
        DecoderInstruction::InsertCountIncrement { increment } => {
            log::trace!("QPACK decoder stream reader: Insert Count Increment {increment}");
            table.on_insert_count_increment(increment)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        h3::H3Settings,
        headers::{
            entry_name::EntryName,
            qpack::{FieldLineValue, encoder_dynamic_table::SectionRefs},
        },
    };
    use futures_lite::future::block_on;

    fn qen(s: &str) -> EntryName<'static> {
        EntryName::try_from(s.as_bytes().to_vec()).unwrap()
    }

    fn fv(s: &'static str) -> FieldLineValue<'static> {
        FieldLineValue::Static(s.as_bytes())
    }

    fn make_table_with_two_entries() -> EncoderDynamicTable {
        // `EncoderDynamicTable::default()` reads `dynamic_table_capacity` from the default
        // `HttpContext`/`HttpConfig`, which is 4096 — matches what we want here.
        let table = EncoderDynamicTable::default();
        table.initialize_from_peer_settings(
            H3Settings::default().with_qpack_max_table_capacity(4096),
        );
        table.insert(qen("a"), fv("1")).unwrap();
        table.insert(qen("b"), fv("2")).unwrap();
        table
    }

    fn push_section(table: &EncoderDynamicTable, stream_id: u64, ric: u64, min_ref: Option<u64>) {
        table.register_outstanding_section(
            stream_id,
            SectionRefs {
                required_insert_count: ric,
                min_ref_abs_idx: min_ref,
            },
        );
    }

    #[test]
    fn parses_section_ack() {
        let table = make_table_with_two_entries();
        push_section(&table, 4, 2, Some(0));
        // Section Ack for stream ID 4: 0x80 | 4 = 0x84
        let mut wire: &[u8] = &[0x84];
        block_on(table.process_instructions(&mut wire)).unwrap();
        assert_eq!(table.known_received_count(), 2);
    }

    #[test]
    fn parses_insert_count_increment() {
        let table = make_table_with_two_entries();
        // ICI increment=1: 0x00 | 1 = 0x01
        let mut wire: &[u8] = &[0x01];
        block_on(table.process_instructions(&mut wire)).unwrap();
        assert_eq!(table.known_received_count(), 1);
    }

    #[test]
    fn parses_stream_cancellation() {
        let table = make_table_with_two_entries();
        push_section(&table, 4, 2, Some(0));
        // Stream Cancel stream_id=4: 0x40 | 4 = 0x44
        let mut wire: &[u8] = &[0x44];
        block_on(table.process_instructions(&mut wire)).unwrap();
        assert_eq!(table.known_received_count(), 0);
    }

    #[test]
    fn parses_multiple_instructions() {
        let table = make_table_with_two_entries();
        push_section(&table, 4, 1, Some(0));
        // Section Ack stream 4, then ICI +1: expects total known_received = 2.
        let mut wire: &[u8] = &[0x84, 0x01];
        block_on(table.process_instructions(&mut wire)).unwrap();
        assert_eq!(table.known_received_count(), 2);
    }

    #[test]
    fn multi_byte_varint() {
        let table = make_table_with_two_entries();
        // Push enough sections that we can ack a large stream id.
        push_section(&table, 200, 2, Some(0));
        // Section Ack for stream 200 needs a multi-byte varint.
        // 7-bit prefix: first byte = 0x80 | 0x7F = 0xFF, then 200 - 127 = 73 = 0x49.
        let mut wire: &[u8] = &[0xFF, 0x49];
        block_on(table.process_instructions(&mut wire)).unwrap();
        assert_eq!(table.known_received_count(), 2);
    }

    #[test]
    fn protocol_error_marks_table_failed() {
        let table = EncoderDynamicTable::default();
        // Section Ack with no outstanding section is a protocol error.
        let mut wire: &[u8] = &[0x84];
        let err = block_on(table.run_reader(&mut wire));
        assert!(err.is_err());
        assert!(table.failed().is_some());
    }

    #[test]
    fn peer_eof_is_closed_critical_stream() {
        // RFC 9204 §4.2: peer FIN on the decoder stream is H3_CLOSED_CRITICAL_STREAM.
        let table = EncoderDynamicTable::default();
        let mut wire: &[u8] = &[];
        let err = block_on(table.run_reader(&mut wire)).unwrap_err();
        assert!(matches!(
            err,
            H3Error::Protocol(H3ErrorCode::ClosedCriticalStream)
        ));
        assert_eq!(table.failed(), Some(H3ErrorCode::ClosedCriticalStream));
    }
}