use crate::{
h3::{H3Error, H3ErrorCode},
headers::qpack::{
EncoderDynamicTable,
instruction::decoder::{DecoderInstruction, parse},
},
};
use futures_lite::io::AsyncRead;
impl EncoderDynamicTable {
pub(crate) async fn run_reader<T>(&self, stream: &mut T) -> Result<(), H3Error>
where
T: AsyncRead + Unpin + Send,
{
log::trace!("QPACK decoder stream reader: started");
let result = process(stream, self).await;
match &result {
Ok(()) => log::trace!("QPACK decoder stream reader: clean EOF"),
Err(H3Error::Protocol(code)) => {
log::debug!("QPACK decoder stream reader: protocol error: {code}");
self.fail(*code);
}
Err(H3Error::Io(e)) => {
log::debug!("QPACK decoder stream reader: I/O error: {e}");
self.fail(H3ErrorCode::QpackDecoderStreamError);
}
}
result
}
}
async fn process<T>(stream: &mut T, table: &EncoderDynamicTable) -> Result<(), H3Error>
where
T: AsyncRead + Unpin + Send,
{
while let Some(instruction) = parse(stream).await? {
apply(table, instruction)?;
}
Ok(())
}
fn apply(table: &EncoderDynamicTable, instruction: DecoderInstruction) -> Result<(), H3Error> {
match instruction {
DecoderInstruction::SectionAcknowledgement { stream_id } => {
log::trace!(
"QPACK decoder stream reader: Section Acknowledgement stream_id={stream_id}"
);
table.on_section_ack(stream_id)
}
DecoderInstruction::StreamCancellation { stream_id } => {
log::trace!("QPACK decoder stream reader: Stream Cancellation stream_id={stream_id}");
table.on_stream_cancel(stream_id);
Ok(())
}
DecoderInstruction::InsertCountIncrement { increment } => {
log::trace!("QPACK decoder stream reader: Insert Count Increment {increment}");
table.on_insert_count_increment(increment)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
h3::H3Settings,
headers::{
entry_name::EntryName,
qpack::{FieldLineValue, encoder_dynamic_table::SectionRefs},
},
};
use futures_lite::future::block_on;
fn qen(s: &str) -> EntryName<'static> {
EntryName::try_from(s.as_bytes().to_vec()).unwrap()
}
fn fv(s: &'static str) -> FieldLineValue<'static> {
FieldLineValue::Static(s.as_bytes())
}
fn make_table_with_two_entries() -> EncoderDynamicTable {
let table = EncoderDynamicTable::default();
table.initialize_from_peer_settings(
H3Settings::default().with_qpack_max_table_capacity(4096),
);
table.insert(qen("a"), fv("1")).unwrap();
table.insert(qen("b"), fv("2")).unwrap();
table
}
fn push_section(table: &EncoderDynamicTable, stream_id: u64, ric: u64, min_ref: Option<u64>) {
table.register_outstanding_section(
stream_id,
SectionRefs {
required_insert_count: ric,
min_ref_abs_idx: min_ref,
},
);
}
#[test]
fn parses_section_ack() {
let table = make_table_with_two_entries();
push_section(&table, 4, 2, Some(0));
let mut wire: &[u8] = &[0x84];
block_on(table.run_reader(&mut wire)).unwrap();
assert_eq!(table.known_received_count(), 2);
}
#[test]
fn parses_insert_count_increment() {
let table = make_table_with_two_entries();
let mut wire: &[u8] = &[0x01];
block_on(table.run_reader(&mut wire)).unwrap();
assert_eq!(table.known_received_count(), 1);
}
#[test]
fn parses_stream_cancellation() {
let table = make_table_with_two_entries();
push_section(&table, 4, 2, Some(0));
let mut wire: &[u8] = &[0x44];
block_on(table.run_reader(&mut wire)).unwrap();
assert_eq!(table.known_received_count(), 0);
}
#[test]
fn parses_multiple_instructions() {
let table = make_table_with_two_entries();
push_section(&table, 4, 1, Some(0));
let mut wire: &[u8] = &[0x84, 0x01];
block_on(table.run_reader(&mut wire)).unwrap();
assert_eq!(table.known_received_count(), 2);
}
#[test]
fn multi_byte_varint() {
let table = make_table_with_two_entries();
push_section(&table, 200, 2, Some(0));
let mut wire: &[u8] = &[0xFF, 0x49];
block_on(table.run_reader(&mut wire)).unwrap();
assert_eq!(table.known_received_count(), 2);
}
#[test]
fn protocol_error_marks_table_failed() {
let table = EncoderDynamicTable::default();
let mut wire: &[u8] = &[0x84];
let err = block_on(table.run_reader(&mut wire));
assert!(err.is_err());
assert!(table.failed().is_some());
}
#[test]
fn clean_eof_returns_ok() {
let table = EncoderDynamicTable::default();
let mut wire: &[u8] = &[];
block_on(table.run_reader(&mut wire)).unwrap();
assert!(table.failed().is_none());
}
}