use std::collections::{HashMap, VecDeque};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct StreamId(pub u32);
impl std::fmt::Display for StreamId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<u32> for StreamId {
fn from(v: u32) -> Self {
Self(v)
}
}
impl From<StreamId> for u32 {
fn from(v: StreamId) -> Self {
v.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TimestampNs(pub u64);
impl TimestampNs {
pub fn saturating_sub(self, other: TimestampNs) -> u64 {
self.0.saturating_sub(other.0)
}
}
impl std::fmt::Display for TimestampNs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}ns", self.0)
}
}
impl From<u64> for TimestampNs {
fn from(v: u64) -> Self {
Self(v)
}
}
impl From<TimestampNs> for u64 {
fn from(v: TimestampNs) -> Self {
v.0
}
}
#[derive(Debug, Clone)]
pub struct H2Limits {
pub max_header_list_size: usize,
pub max_header_count: usize,
pub max_header_value_size: usize,
pub max_table_size: usize,
pub max_concurrent_streams: usize,
pub stream_timeout_ns: u64,
pub max_body_size: usize,
pub max_buffer_size: usize,
}
impl Default for H2Limits {
fn default() -> Self {
Self {
max_header_list_size: 65536,
max_header_count: 128,
max_header_value_size: 8192,
max_table_size: 65536,
max_concurrent_streams: 100,
stream_timeout_ns: 30_000_000_000,
max_body_size: 10 * 1024 * 1024, max_buffer_size: 1024 * 1024, }
}
}
pub struct H2ConnectionState {
pub(crate) decoder: loona_hpack::Decoder<'static>,
pub(crate) active_streams: HashMap<StreamId, StreamState>,
pub(crate) settings: H2Settings,
pub(crate) limits: H2Limits,
pub preface_received: bool,
pub(crate) highest_stream_id: StreamId,
pub(crate) buffer: Vec<u8>,
pub(crate) expecting_continuation: Option<StreamId>,
pub(crate) completed: VecDeque<(StreamId, ParsedH2Message)>,
pub(crate) current_timestamp_ns: TimestampNs,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum StreamPhase {
ReceivingHeaders { end_stream_seen: bool },
ReceivingBody,
Complete,
}
pub(crate) struct StreamState {
pub(crate) headers: Vec<(String, String)>,
pub(crate) method: Option<String>,
pub(crate) path: Option<String>,
pub(crate) authority: Option<String>,
pub(crate) scheme: Option<String>,
pub(crate) status: Option<u16>,
pub(crate) body: Vec<u8>,
pub(crate) continuation_buffer: Vec<u8>,
pub(crate) header_size: usize,
pub(crate) phase: StreamPhase,
pub(crate) first_frame_timestamp_ns: TimestampNs,
pub(crate) end_stream_timestamp_ns: TimestampNs,
}
#[derive(Clone)]
pub(crate) struct H2Settings {
pub(crate) header_table_size: u32,
pub(crate) enable_push: bool,
pub(crate) max_concurrent_streams: u32,
pub(crate) initial_window_size: u32,
pub(crate) max_frame_size: u32,
pub(crate) max_header_list_size: u32,
}
impl Default for H2Settings {
fn default() -> Self {
Self {
header_table_size: 4096,
enable_push: true,
max_concurrent_streams: u32::MAX,
initial_window_size: 65535,
max_frame_size: 16384,
max_header_list_size: u32::MAX,
}
}
}
impl Default for H2ConnectionState {
fn default() -> Self {
let limits = H2Limits::default();
let mut decoder = loona_hpack::Decoder::new();
decoder.set_max_allowed_table_size(limits.max_table_size);
Self {
decoder,
active_streams: HashMap::new(),
settings: H2Settings::default(),
limits,
preface_received: false,
highest_stream_id: StreamId(0),
buffer: Vec::new(),
expecting_continuation: None,
completed: VecDeque::new(),
current_timestamp_ns: TimestampNs(0),
}
}
}
impl H2ConnectionState {
pub fn new() -> Self {
Self::default()
}
pub fn with_limits(limits: H2Limits) -> Self {
let mut decoder = loona_hpack::Decoder::new();
decoder.set_max_allowed_table_size(limits.max_table_size);
Self {
decoder,
active_streams: HashMap::new(),
settings: H2Settings::default(),
limits,
preface_received: false,
highest_stream_id: StreamId(0),
buffer: Vec::new(),
expecting_continuation: None,
completed: VecDeque::new(),
current_timestamp_ns: TimestampNs(0),
}
}
pub fn feed(&mut self, data: &[u8], timestamp_ns: TimestampNs) -> Result<(), ParseError> {
if self.buffer.len() + data.len() > self.limits.max_buffer_size {
return Err(ParseError::new(ParseErrorKind::Http2BufferTooLarge));
}
self.buffer.extend_from_slice(data);
self.current_timestamp_ns = timestamp_ns;
crate::parse::parse_buffer_incremental(self)
}
pub fn try_pop(&mut self) -> Option<(StreamId, ParsedH2Message)> {
self.completed.pop_front()
}
pub fn has_completed(&self) -> bool {
!self.completed.is_empty()
}
pub fn clear_buffer(&mut self) {
self.buffer.clear();
}
pub fn active_stream_count(&self) -> usize {
self.active_streams.len()
}
pub fn evict_stale_streams(&mut self, current_time_ns: TimestampNs) {
let timeout = self.limits.stream_timeout_ns;
let max_streams = self.limits.max_concurrent_streams;
self.active_streams.retain(|_id, stream| {
let stale = current_time_ns
.0
.saturating_sub(stream.first_frame_timestamp_ns.0)
>= timeout;
if stale {
crate::trace_warn!("evicting stale stream {_id} (timeout)");
}
!stale
});
while self.active_streams.len() > max_streams {
let oldest_id = self
.active_streams
.iter()
.min_by_key(|(_, s)| s.first_frame_timestamp_ns)
.map(|(&id, _)| id);
if let Some(id) = oldest_id {
crate::trace_warn!("evicting stream {id} (over max_concurrent_streams)");
self.active_streams.remove(&id);
} else {
break;
}
}
}
}
impl StreamState {
pub(crate) fn new(_stream_id: StreamId, timestamp_ns: TimestampNs) -> Self {
Self {
headers: Vec::new(),
method: None,
path: None,
authority: None,
scheme: None,
status: None,
body: Vec::new(),
continuation_buffer: Vec::new(),
header_size: 0,
phase: StreamPhase::ReceivingHeaders {
end_stream_seen: false,
},
first_frame_timestamp_ns: timestamp_ns,
end_stream_timestamp_ns: TimestampNs(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ParsedH2Message {
pub method: Option<String>,
pub path: Option<String>,
pub authority: Option<String>,
pub scheme: Option<String>,
pub status: Option<u16>,
pub headers: Vec<(String, String)>,
pub stream_id: StreamId,
pub header_size: usize,
pub body: Vec<u8>,
pub first_frame_timestamp_ns: TimestampNs,
pub end_stream_timestamp_ns: TimestampNs,
}
impl ParsedH2Message {
pub fn is_request(&self) -> bool {
self.method.is_some()
}
pub fn is_response(&self) -> bool {
self.status.is_some()
}
pub fn http_method(&self) -> Option<http::Method> {
self.method
.as_ref()
.and_then(|m| http::Method::from_bytes(m.as_bytes()).ok())
}
pub fn http_uri(&self) -> Option<http::Uri> {
let path = self.path.as_deref().unwrap_or("/");
path.parse().ok()
}
pub fn http_status(&self) -> Option<http::StatusCode> {
self.status.and_then(|s| http::StatusCode::from_u16(s).ok())
}
pub fn http_headers(&self) -> http::HeaderMap {
let mut header_map = http::HeaderMap::new();
if let Some(authority) = &self.authority
&& let Ok(v) = http::HeaderValue::from_str(authority)
{
header_map.insert(http::header::HOST, v);
}
for (name, value) in &self.headers {
if name.starts_with(':') {
continue;
}
let parsed = (
http::header::HeaderName::from_bytes(name.as_bytes()),
http::HeaderValue::from_str(value),
);
if let (Ok(n), Ok(v)) = parsed {
header_map.append(n, v);
}
}
header_map
}
pub fn to_http_request(&self) -> Option<crate::HttpRequest> {
Some(crate::HttpRequest {
method: self.http_method()?,
uri: self.http_uri()?,
headers: self.http_headers(),
body: self.body.clone(),
timestamp_ns: self.end_stream_timestamp_ns,
version: Some(2),
})
}
pub fn to_http_response(&self) -> Option<crate::HttpResponse> {
Some(crate::HttpResponse {
status: self.http_status()?,
headers: self.http_headers(),
body: self.body.clone(),
timestamp_ns: self.first_frame_timestamp_ns,
version: Some(2),
reason: None,
})
}
pub fn into_http_request(self) -> Option<crate::HttpRequest> {
let method = self.http_method()?;
let uri = self.http_uri()?;
let headers = self.http_headers();
Some(crate::HttpRequest {
method,
uri,
headers,
body: self.body,
timestamp_ns: self.end_stream_timestamp_ns,
version: None,
})
}
pub fn into_http_response(self) -> Option<crate::HttpResponse> {
let status = self.http_status()?;
let headers = self.http_headers();
Some(crate::HttpResponse {
status,
headers,
body: self.body,
timestamp_ns: self.first_frame_timestamp_ns,
version: None,
reason: None,
})
}
}
#[derive(Debug, Clone)]
pub enum ParseErrorKind {
Http2BufferTooSmall,
Http2HpackError(String),
Http2HeadersIncomplete,
Http2HeaderListTooLarge,
Http2NoMethod,
Http2NoPath,
Http2NoStatus,
Http2InvalidFrame,
Http2MaxConcurrentStreams,
Http2PaddingError,
Http2PriorityError,
Http2StreamNotFound,
Http2InvalidHeaderEncoding,
Http2BufferTooLarge,
Http2FrameSizeError,
Http2ContinuationExpected,
Http2SettingsLengthError,
}
impl std::fmt::Display for ParseErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Http2BufferTooSmall => write!(f, "HTTP/2 buffer too small to parse frame header"),
Self::Http2HpackError(msg) => write!(f, "HTTP/2 HPACK decoding error: {msg}"),
Self::Http2HeadersIncomplete => {
write!(f, "HTTP/2 headers incomplete (missing CONTINUATION)")
},
Self::Http2HeaderListTooLarge => write!(f, "HTTP/2 header list exceeds size limits"),
Self::Http2NoMethod => write!(f, "HTTP/2 request missing :method pseudo-header"),
Self::Http2NoPath => write!(f, "HTTP/2 request missing :path pseudo-header"),
Self::Http2NoStatus => write!(f, "HTTP/2 response missing :status pseudo-header"),
Self::Http2InvalidFrame => write!(f, "HTTP/2 invalid frame"),
Self::Http2MaxConcurrentStreams => {
write!(f, "HTTP/2 max concurrent streams limit reached")
},
Self::Http2PaddingError => write!(f, "HTTP/2 frame has missing or invalid padding"),
Self::Http2PriorityError => {
write!(f, "HTTP/2 PRIORITY flag present but header block too short")
},
Self::Http2StreamNotFound => write!(f, "HTTP/2 frame references unknown stream"),
Self::Http2InvalidHeaderEncoding => {
write!(f, "HTTP/2 header contains invalid UTF-8 encoding")
},
Self::Http2BufferTooLarge => {
write!(f, "HTTP/2 internal buffer exceeds max_buffer_size")
},
Self::Http2FrameSizeError => {
write!(f, "HTTP/2 frame payload exceeds negotiated max_frame_size")
},
Self::Http2ContinuationExpected => {
write!(
f,
"HTTP/2 expected CONTINUATION frame but received different frame type or \
stream"
)
},
Self::Http2SettingsLengthError => {
write!(
f,
"HTTP/2 SETTINGS frame payload is not a multiple of 6 bytes"
)
},
}
}
}
#[derive(Debug, Clone)]
pub struct ParseError {
pub kind: ParseErrorKind,
pub stream_id: Option<StreamId>,
}
impl ParseError {
pub fn new(kind: ParseErrorKind) -> Self {
Self {
kind,
stream_id: None,
}
}
pub fn with_stream(kind: ParseErrorKind, stream_id: StreamId) -> Self {
Self {
kind,
stream_id: Some(stream_id),
}
}
}
impl std::fmt::Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(sid) = self.stream_id {
write!(f, "[stream {sid}] {}", self.kind)
} else {
write!(f, "{}", self.kind)
}
}
}
impl std::error::Error for ParseError {}