use crate::{
HttpContext,
h3::{H3Error, H3ErrorCode, H3Settings},
headers::{
header_observer::HeaderCompression,
qpack::{FieldLineValue, HeaderObserver},
recent_pairs::RecentPairs,
},
};
use event_listener::{Event, EventListener};
use state::TableState;
use std::sync::{Arc, Mutex};
mod connection_metrics;
mod encode;
mod reader;
mod state;
#[cfg(test)]
mod tests;
mod writer;
use connection_metrics::ConnectionMetrics;
pub(in crate::headers) use state::SectionRefs;
#[derive(Debug)]
pub struct EncoderDynamicTable {
state: Mutex<TableState>,
observer: Arc<HeaderObserver>,
our_max_capacity: usize,
event: Event,
metrics: ConnectionMetrics,
}
impl Default for EncoderDynamicTable {
fn default() -> Self {
Self::new(&HttpContext::default())
}
}
impl EncoderDynamicTable {
pub(crate) fn new(context: &HttpContext) -> Self {
let observer = context.observer.clone();
let recent_pairs_size = context.config.recent_pairs_size;
log::trace!(
target: "qpack_metrics",
"new EncoderDynamicTable: observer ptr={:p} our_max_capacity={} recent_pairs_size={}",
Arc::as_ptr(&observer),
context.config.dynamic_table_capacity,
recent_pairs_size,
);
Self {
state: Mutex::new(TableState::new(RecentPairs::with_size(recent_pairs_size))),
observer,
our_max_capacity: context.config.dynamic_table_capacity,
event: Event::new(),
metrics: ConnectionMetrics::default(),
}
}
}
impl EncoderDynamicTable {
pub(crate) fn initialize_from_peer_settings(&self, peer_settings: H3Settings) {
let peer_max_capacity =
usize::try_from(peer_settings.qpack_max_table_capacity().unwrap_or(0))
.unwrap_or(usize::MAX);
let chosen = self.our_max_capacity.min(peer_max_capacity);
let max_blocked_streams =
usize::try_from(peer_settings.qpack_blocked_streams().unwrap_or(0))
.unwrap_or(usize::MAX);
let prime_entries = if chosen > 0 {
let cap = u32::try_from(chosen).unwrap_or(u32::MAX);
self.observer.prime(cap, HeaderCompression::Qpack)
} else {
Vec::new()
};
log::info!(
target: "qpack_metrics",
"initialize_from_peer_settings: peer_max_capacity={peer_max_capacity} \
our_max_capacity={our} chosen={chosen} max_blocked_streams={max_blocked_streams} \
prime_entries={}",
prime_entries.len(),
our = self.our_max_capacity,
);
let mut state = self.state.lock().unwrap();
debug_assert_eq!(
state.max_capacity, 0,
"initialize_from_peer_settings called twice"
);
state.max_capacity = chosen;
state.max_blocked_streams = max_blocked_streams;
if chosen > 0 {
state
.set_capacity(chosen)
.expect("set_capacity within max_capacity at init");
for candidate in prime_entries {
let name_for_metrics = candidate.name.clone();
let value_for_insert = candidate
.value
.clone()
.unwrap_or(FieldLineValue::Static(b""));
let value_for_metrics = value_for_insert.clone();
let kind = if candidate.value.is_some() {
"full-pair"
} else {
"name-only"
};
let entry_size = candidate.name.len() + value_for_insert.as_bytes().len() + 32;
match state.insert(candidate.name, value_for_insert, None) {
Ok(abs_idx) => {
state.primed_bytes = state.primed_bytes.saturating_add(entry_size);
let wire_bytes = state
.pending_ops
.back()
.map_or(0, |b| u64::try_from(b.len()).unwrap_or(u64::MAX));
log::info!(
target: "qpack_metrics",
"priming insert ({kind}): abs_idx={abs_idx} wire_bytes={wire_bytes} \
name={:?} value={:?}",
name_for_metrics,
String::from_utf8_lossy(value_for_metrics.as_bytes()),
);
self.metrics.record_primed_insert(
abs_idx,
name_for_metrics,
value_for_metrics,
wire_bytes,
);
}
Err(err) => {
log::debug!("qpack observer priming insert failed: {err:?}");
break;
}
}
}
drop(state);
self.event.notify(usize::MAX);
}
}
pub(in crate::headers) fn drain_pending_ops(&self) -> Vec<Vec<u8>> {
self.state.lock().unwrap().pending_ops.drain(..).collect()
}
pub(in crate::headers) fn listen(&self) -> EventListener {
self.event.listen()
}
pub(in crate::headers) fn on_section_ack(&self, stream_id: u64) -> Result<(), H3Error> {
let mut state = self.state.lock().unwrap();
let section = state
.outstanding_sections
.get_mut(&stream_id)
.and_then(std::collections::VecDeque::pop_front)
.ok_or(H3ErrorCode::QpackDecoderStreamError)?;
if state
.outstanding_sections
.get(&stream_id)
.is_some_and(std::collections::VecDeque::is_empty)
{
state.outstanding_sections.remove(&stream_id);
}
if section.required_insert_count > state.known_received_count {
state.known_received_count = section.required_insert_count;
}
drop(state);
self.event.notify(usize::MAX);
Ok(())
}
pub(in crate::headers) fn on_stream_cancel(&self, stream_id: u64) {
let mut state = self.state.lock().unwrap();
state.outstanding_sections.remove(&stream_id);
drop(state);
self.event.notify(usize::MAX);
}
pub(in crate::headers) fn on_insert_count_increment(
&self,
increment: u64,
) -> Result<(), H3Error> {
if increment == 0 {
return Err(H3ErrorCode::QpackDecoderStreamError.into());
}
let mut state = self.state.lock().unwrap();
let new_value = state
.known_received_count
.checked_add(increment)
.ok_or(H3ErrorCode::QpackDecoderStreamError)?;
if new_value > state.insert_count {
return Err(H3ErrorCode::QpackDecoderStreamError.into());
}
state.known_received_count = new_value;
drop(state);
self.event.notify(usize::MAX);
Ok(())
}
pub(crate) fn fail(&self, code: H3ErrorCode) {
self.state.lock().unwrap().failed = Some(code);
self.event.notify(usize::MAX);
}
pub(in crate::headers) fn failed(&self) -> Option<H3ErrorCode> {
self.state.lock().unwrap().failed
}
}
impl Drop for EncoderDynamicTable {
fn drop(&mut self) {
if let Ok(state) = self.state.lock() {
self.observer.fold_connection(&state.accum);
}
}
}