use crate::{
h3::{H3Error, H3ErrorCode},
headers::entry_name::EntryName,
};
use event_listener::{Event, EventListener};
use std::{
borrow::Cow,
collections::{BTreeMap, VecDeque},
fmt::{self, Debug},
future::Future,
pin::Pin,
sync::Mutex,
task::{Context, Poll, Waker},
};
mod decode;
mod reader;
#[cfg(test)]
mod tests;
mod writer;
#[derive(Debug)]
pub struct DecoderDynamicTable {
inner: Mutex<DecoderDynamicTableInner>,
event: Event,
}
struct DecoderDynamicTableInner {
entries: VecDeque<DynamicEntry>,
max_capacity: usize,
capacity: usize,
current_size: usize,
insert_count: u64,
failed: Option<H3ErrorCode>,
pending_section_acks: Vec<PendingSectionAck>,
max_blocked_streams: usize,
currently_blocked_streams: usize,
waiters: BTreeMap<(u64, u64), Waker>,
next_waiter_id: u64,
}
impl Debug for DecoderDynamicTableInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DecoderDynamicTableInner")
.field(
"entries",
&fmt::from_fn(|f| {
let mut map = f.debug_map();
for DynamicEntry { name, value, .. } in &self.entries {
map.entry(name, &format_args!("{}", String::from_utf8_lossy(value)));
}
map.finish()
}),
)
.field("max_capacity", &self.max_capacity)
.field("capacity", &self.capacity)
.field("current_size", &self.current_size)
.field("insert_count", &self.insert_count)
.field("failed", &self.failed)
.field("pending_section_acks", &self.pending_section_acks)
.field("max_blocked_streams", &self.max_blocked_streams)
.field("currently_blocked_streams", &self.currently_blocked_streams)
.field("waiters", &self.waiters)
.field("next_waiter_id", &self.next_waiter_id)
.finish()
}
}
#[derive(Debug, Clone, Copy)]
pub(in crate::headers) struct PendingSectionAck {
pub(in crate::headers) stream_id: u64,
pub(in crate::headers) required_insert_count: u64,
}
#[derive(Clone)]
struct DynamicEntry {
name: EntryName<'static>,
value: Cow<'static, [u8]>,
size: usize,
}
impl Debug for DynamicEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DynamicEntry")
.field("name", &self.name)
.field(
"value",
&format_args!("{}", String::from_utf8_lossy(&self.value)),
)
.field("size", &self.size)
.finish()
}
}
#[derive(Debug)]
pub(in crate::headers) struct BlockedStreamGuard<'a>(&'a DecoderDynamicTable);
impl Drop for BlockedStreamGuard<'_> {
fn drop(&mut self) {
self.0.decrement_blocked_streams();
}
}
impl DecoderDynamicTable {
pub(crate) fn new(max_capacity: usize, max_blocked_streams: usize) -> Self {
Self {
inner: Mutex::new(DecoderDynamicTableInner {
entries: VecDeque::new(),
max_capacity,
capacity: 0,
current_size: 0,
insert_count: 0,
failed: None,
pending_section_acks: Vec::new(),
max_blocked_streams,
currently_blocked_streams: 0,
waiters: BTreeMap::new(),
next_waiter_id: 0,
}),
event: Event::new(),
}
}
pub(in crate::headers) fn try_reserve_blocked_stream(
&self,
required_insert_count: u64,
) -> Result<Option<BlockedStreamGuard<'_>>, H3ErrorCode> {
let mut inner = self.inner.lock().unwrap();
if inner.insert_count >= required_insert_count {
return Ok(None);
}
if inner.currently_blocked_streams >= inner.max_blocked_streams {
return Err(H3ErrorCode::QpackDecompressionFailed);
}
inner.currently_blocked_streams += 1;
Ok(Some(BlockedStreamGuard(self)))
}
fn decrement_blocked_streams(&self) {
self.inner.lock().unwrap().currently_blocked_streams -= 1;
}
pub(in crate::headers) fn max_capacity(&self) -> usize {
self.inner.lock().unwrap().max_capacity
}
pub(in crate::headers) fn decode_required_insert_count(
&self,
encoded: usize,
) -> Result<u64, H3ErrorCode> {
if encoded == 0 {
return Ok(0);
}
let inner = self.inner.lock().unwrap();
let max_entries = inner.max_capacity / 32;
if max_entries == 0 {
return Err(H3ErrorCode::QpackDecompressionFailed);
}
let full_range = 2 * max_entries;
if encoded > full_range {
return Err(H3ErrorCode::QpackDecompressionFailed);
}
let total_inserts = inner.insert_count;
let max_value = total_inserts + max_entries as u64;
let max_wrapped = (max_value / full_range as u64) * full_range as u64;
let mut ric = max_wrapped + encoded as u64 - 1;
if ric > max_value {
if ric < full_range as u64 {
return Err(H3ErrorCode::QpackDecompressionFailed);
}
ric -= full_range as u64;
}
if ric == 0 {
return Err(H3ErrorCode::QpackDecompressionFailed);
}
Ok(ric)
}
pub(in crate::headers) fn set_capacity(&self, new_capacity: usize) -> Result<(), H3Error> {
let mut inner = self.inner.lock().unwrap();
if new_capacity > inner.max_capacity {
return Err(H3ErrorCode::QpackEncoderStreamError.into());
}
inner.capacity = new_capacity;
while inner.current_size > inner.capacity {
let Some(evicted) = inner.entries.pop_back() else {
break;
};
inner.current_size -= evicted.size;
}
Ok(())
}
pub(in crate::headers) fn insert(
&self,
name: impl Into<EntryName<'static>>,
value: Cow<'static, [u8]>,
) -> Result<(), H3Error> {
let name = name.into();
let entry_size = name.len() + value.as_ref().len() + 32;
let mut inner = self.inner.lock().unwrap();
if entry_size > inner.capacity {
log::error!(
"Qpack Decoder table entry {name}: (value) exceeded capacity: {entry_size} > {}",
inner.capacity
);
return Err(H3ErrorCode::QpackEncoderStreamError.into());
}
while inner.current_size + entry_size > inner.capacity {
let Some(evicted) = inner.entries.pop_back() else {
break;
};
inner.current_size -= evicted.size;
}
inner.entries.push_front(DynamicEntry {
name,
value,
size: entry_size,
});
inner.current_size += entry_size;
inner.insert_count += 1;
let insert_count = inner.insert_count;
let ready: Vec<Waker> = inner
.waiters
.extract_if(..=(insert_count, u64::MAX), |_, _| true)
.map(|(_, waker)| waker)
.collect();
drop(inner);
for waker in ready {
waker.wake();
}
self.event.notify(usize::MAX);
Ok(())
}
pub(in crate::headers) fn duplicate(&self, relative_index: usize) -> Result<(), H3Error> {
let (name, value) = {
let inner = self.inner.lock().unwrap();
inner
.entries
.get(relative_index)
.map(|e| (e.name.clone(), e.value.clone()))
.ok_or(H3ErrorCode::QpackEncoderStreamError)?
};
self.insert(name, value)
}
pub(in crate::headers) fn name_at_relative(
&self,
relative_index: usize,
) -> Option<EntryName<'static>> {
self.inner
.lock()
.unwrap()
.entries
.get(relative_index)
.map(|e| e.name.clone())
}
pub(in crate::headers) fn acknowledge_section(
&self,
stream_id: u64,
required_insert_count: u64,
) {
self.inner
.lock()
.unwrap()
.pending_section_acks
.push(PendingSectionAck {
stream_id,
required_insert_count,
});
self.event.notify(usize::MAX);
}
pub(in crate::headers) fn drain_pending_acks_and_count(&self) -> (Vec<PendingSectionAck>, u64) {
let mut inner = self.inner.lock().unwrap();
let acks = inner.pending_section_acks.drain(..).collect();
let count = inner.insert_count;
(acks, count)
}
pub(in crate::headers) fn listen(&self) -> EventListener {
self.event.listen()
}
pub(crate) fn fail(&self, code: H3ErrorCode) {
let wakers: Vec<Waker> = {
let mut inner = self.inner.lock().unwrap();
inner.failed = Some(code);
std::mem::take(&mut inner.waiters).into_values().collect()
};
for waker in wakers {
waker.wake();
}
self.event.notify(usize::MAX);
}
pub(in crate::headers) async fn get(
&self,
absolute_index: u64,
required_insert_count: u64,
) -> Result<(EntryName<'static>, Cow<'static, [u8]>), H3Error> {
ThresholdWait {
table: self,
threshold: required_insert_count,
waiter_id: None,
}
.await?;
let inner = self.inner.lock().unwrap();
if let Some(code) = inner.failed {
return Err(code.into());
}
inner
.get(absolute_index)
.ok_or_else(|| H3ErrorCode::QpackDecompressionFailed.into())
}
}
struct ThresholdWait<'a> {
table: &'a DecoderDynamicTable,
threshold: u64,
waiter_id: Option<u64>,
}
impl Future for ThresholdWait<'_> {
type Output = Result<(), H3ErrorCode>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut inner = this.table.inner.lock().unwrap();
if let Some(code) = inner.failed {
if let Some(id) = this.waiter_id.take() {
inner.waiters.remove(&(this.threshold, id));
}
return Poll::Ready(Err(code));
}
if inner.insert_count >= this.threshold {
if let Some(id) = this.waiter_id.take() {
inner.waiters.remove(&(this.threshold, id));
log::trace!(
"QPACK: insert_count {} met required {} — unblocked",
inner.insert_count,
this.threshold
);
}
return Poll::Ready(Ok(()));
}
let id = if let Some(id) = this.waiter_id {
id
} else {
let id = inner.next_waiter_id;
inner.next_waiter_id = inner.next_waiter_id.wrapping_add(1);
log::trace!(
"QPACK: waiting for insert_count >= {} (currently {})",
this.threshold,
inner.insert_count
);
id
};
inner
.waiters
.insert((this.threshold, id), cx.waker().clone());
this.waiter_id = Some(id);
Poll::Pending
}
}
impl Drop for ThresholdWait<'_> {
fn drop(&mut self) {
if let Some(id) = self.waiter_id.take() {
if let Ok(mut inner) = self.table.inner.lock() {
inner.waiters.remove(&(self.threshold, id));
}
}
}
}
impl DecoderDynamicTableInner {
fn get(&self, absolute_index: u64) -> Option<(EntryName<'static>, Cow<'static, [u8]>)> {
let i = usize::try_from(
self.insert_count
.checked_sub(1)?
.checked_sub(absolute_index)?,
)
.ok()?;
self.entries
.get(i)
.map(|e| (e.name.clone(), e.value.clone()))
}
}