1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//! QPACK encoder stream processing.
//!
//! The encoder stream is a unidirectional stream sent by the peer carrying instructions
//! that modify the dynamic table.
use crate::{
h3::{H3Error, H3ErrorCode},
headers::{
entry_name::EntryName,
qpack::{
decoder_dynamic_table::DecoderDynamicTable,
instruction::encoder::{EncoderInstruction, parse},
static_table::static_entry,
},
},
};
use futures_lite::io::AsyncRead;
use std::borrow::Cow;
impl DecoderDynamicTable {
/// Process a QPACK encoder stream, applying each instruction to `table`.
///
/// Reads a continuous stream of encoder instructions (Set Dynamic Table Capacity, Insert
/// With Name Reference, Insert With Literal Name, Duplicate) and applies them to the
/// connection's dynamic table. Returns when the stream closes or an error occurs; on
/// error, marks the table as failed so blocked decode futures are woken with an error.
///
/// # Errors
///
/// Returns an `H3Error` on I/O failure or protocol error.
pub(crate) async fn run_reader<T: AsyncRead + Unpin + Send>(
&self,
stream: &mut T,
) -> Result<(), H3Error> {
// Closure of the encoder stream is a connection error of type
// H3_CLOSED_CRITICAL_STREAM. process_instructions returns Ok on clean EOF; here we
// promote that to an error and fail the table. (Graceful server-side shutdown drops
// this future via swansong before reaching that arm; reaching it means the peer
// FIN'd their encoder stream while our connection was still alive.)
let result = match self.process_instructions(stream).await {
Ok(()) => {
log::debug!("QPACK encoder stream: peer closed (FIN) — H3_CLOSED_CRITICAL_STREAM");
Err(H3ErrorCode::ClosedCriticalStream.into())
}
Err(e) => Err(e),
};
match &result {
Err(H3Error::Protocol(code)) => {
log::debug!("QPACK encoder stream: protocol error: {code}");
self.fail(*code);
}
Err(H3Error::Io(e)) => {
log::debug!("QPACK encoder stream: I/O error: {e}");
self.fail(H3ErrorCode::QpackEncoderStreamError);
}
// unreachable given the EOF promotion above; defensively a no-op.
Ok(()) => {}
}
result
}
/// Loop-body of [`run_reader`] separated for tests and corpus replay: parse and apply
/// peer instructions until clean EOF or error, but **do not** convert EOF into
/// `H3_CLOSED_CRITICAL_STREAM` and **do not** mark the table failed. Production wiring
/// goes through [`run_reader`], which does both.
pub(crate) async fn process_instructions<T>(&self, stream: &mut T) -> Result<(), H3Error>
where
T: AsyncRead + Unpin + Send,
{
let max_entry_size = self.max_capacity();
while let Some(instruction) = parse(max_entry_size, stream).await? {
self.apply(instruction)?;
}
Ok(())
}
fn apply(&self, instruction: EncoderInstruction) -> Result<(), H3Error> {
match instruction {
EncoderInstruction::SetCapacity(capacity) => {
log::trace!("QPACK encoder: Set Dynamic Table Capacity {capacity}");
self.set_capacity(capacity).inspect_err(|e| {
log::error!("QPACK encoder: set_capacity({capacity}) failed: {e:?}");
})
}
EncoderInstruction::InsertWithStaticNameRef { name_index, value } => {
let (static_name, _) = static_entry(name_index).map_err(|e| {
log::error!("QPACK encoder: static_entry({name_index}) failed: {e:?}");
H3ErrorCode::QpackEncoderStreamError
})?;
let name = EntryName::from(*static_name);
log::trace!(
"QPACK encoder: Insert With Name Reference (static) [{name}: {}]",
String::from_utf8_lossy(&value)
);
self.insert(name, Cow::Owned(value))
}
EncoderInstruction::InsertWithDynamicNameRef {
relative_index,
value,
} => {
let name = self.name_at_relative(relative_index).ok_or_else(|| {
log::error!("QPACK encoder: name_at_relative({relative_index}) returned None");
H3ErrorCode::QpackEncoderStreamError
})?;
log::trace!(
"QPACK encoder: Insert With Name Reference (dynamic) [{name}: {}]",
String::from_utf8_lossy(&value)
);
self.insert(name, Cow::Owned(value))
}
EncoderInstruction::InsertWithLiteralName { name, value } => {
log::trace!(
"QPACK encoder: Insert With Literal Name [{name}: {}]",
String::from_utf8_lossy(&value)
);
self.insert(name, Cow::Owned(value))
}
EncoderInstruction::Duplicate { relative_index } => {
log::trace!("QPACK encoder: Duplicate index {relative_index}");
self.duplicate(relative_index).inspect_err(|e| {
log::error!("QPACK encoder: duplicate({relative_index}) failed: {e:?}");
})
}
}
}
}