h2session/lib.rs
1#![warn(missing_docs)]
2//! Stateful HTTP/2 frame parser with HPACK support for passive traffic
3//! monitoring.
4//!
5//! This crate decodes HTTP/2 frames and HPACK-compressed headers from raw
6//! byte streams, maintaining per-connection decoder state so that dynamic
7//! table entries are preserved across successive `feed()` calls.
8//!
9//! # Key types
10//!
11//! - [`H2SessionCache`] — thread-safe cache of many connections keyed by an
12//! arbitrary `K`. Best when you have many connections and want automatic
13//! state management.
14//! - [`H2ConnectionState`] — state for a single HTTP/2 connection. Use
15//! [`feed()`](H2ConnectionState::feed) to push data incrementally and
16//! [`try_pop()`](H2ConnectionState::try_pop) to retrieve completed messages.
17//!
18//! # Examples
19//!
20//! ## Multi-connection cache
21//!
22//! ```no_run
23//! use h2session::{H2SessionCache, ParsedH2Message};
24//!
25//! let cache = H2SessionCache::<u64>::new();
26//!
27//! // Parse a buffer for connection 42
28//! let completed = cache.parse(42, &raw_bytes).unwrap();
29//! for (stream_id, msg) in &completed {
30//! if msg.is_request() {
31//! println!(
32//! "{} {}",
33//! msg.method.as_deref().unwrap_or("?"),
34//! msg.path.as_deref().unwrap_or("/")
35//! );
36//! }
37//! }
38//! # let raw_bytes: Vec<u8> = vec![];
39//! ```
40//!
41//! ## Single-connection incremental parsing
42//!
43//! ```no_run
44//! use h2session::{H2ConnectionState, TimestampNs};
45//!
46//! let mut state = H2ConnectionState::new();
47//!
48//! // Feed data as it arrives
49//! state.feed(&chunk, TimestampNs(0)).unwrap();
50//!
51//! // Pop completed messages
52//! while let Some((stream_id, msg)) = state.try_pop() {
53//! println!("stream {stream_id}: request={}", msg.is_request());
54//! }
55//! # let chunk: Vec<u8> = vec![];
56//! ```
57//!
58//! # Feature flags
59//!
60//! - **`tracing`** — emit `tracing::warn!` events for non-fatal parse issues
61//! (stale stream eviction, etc.)
62
63mod frame;
64mod http_types;
65mod parse;
66mod state;
67
68#[cfg(test)]
69mod tests;
70
71#[cfg(feature = "tracing")]
72macro_rules! trace_warn {
73 ($($arg:tt)*) => { ::tracing::warn!($($arg)*) }
74}
75#[cfg(not(feature = "tracing"))]
76macro_rules! trace_warn {
77 ($($arg:tt)*) => {};
78}
79use std::{collections::HashMap, hash::Hash, sync::Mutex};
80
81// Public re-exports for direct state management
82use dashmap::DashMap;
83pub use frame::{CONNECTION_PREFACE, is_http2_preface, looks_like_http2_frame};
84pub use http_types::{HttpRequest, HttpResponse};
85pub use state::{
86 H2ConnectionState,
87 H2Limits,
88 ParseError,
89 ParseErrorKind,
90 ParsedH2Message,
91 StreamId,
92 TimestampNs,
93};
94pub(crate) use trace_warn;
95
96/// HTTP/2 session cache with generic connection keys.
97///
98/// Uses `DashMap<K, Mutex<H2ConnectionState>>` to provide per-key
99/// serialization. The DashMap shard lock is held only briefly (to look up or
100/// insert the entry), while the per-key Mutex serializes concurrent same-key
101/// calls to `parse()`. This prevents the remove-and-reinsert race where two
102/// threads would both create default state for the same key, losing one
103/// thread's HPACK table.
104pub struct H2SessionCache<K> {
105 connections: DashMap<K, Mutex<H2ConnectionState>>,
106}
107
108impl<K: Hash + Eq + Clone> H2SessionCache<K> {
109 /// Create a new cache
110 pub fn new() -> Self {
111 Self {
112 connections: DashMap::new(),
113 }
114 }
115
116 /// Parse buffer with connection state
117 ///
118 /// If the connection key doesn't exist, creates new state automatically.
119 /// Returns completed HTTP/2 messages indexed by stream_id. The map may be
120 /// empty if no streams completed yet — this is not an error.
121 pub fn parse(
122 &self,
123 key: K,
124 buffer: &[u8],
125 ) -> Result<HashMap<StreamId, ParsedH2Message>, ParseError> {
126 // Atomic insert-if-absent
127 self.connections
128 .entry(key.clone())
129 .or_insert_with(|| Mutex::new(H2ConnectionState::default()));
130
131 // Get shared shard read lock + per-key mutex lock
132 let entry = self.connections.get(&key).expect("entry was just ensured");
133 let mut state = entry.lock().unwrap_or_else(|e| e.into_inner());
134 parse::parse_frames_stateful(buffer, &mut state)
135 }
136
137 /// Remove connection state (call when connection closes)
138 pub fn remove(&self, key: &K) -> Option<H2ConnectionState> {
139 self.connections
140 .remove(key)
141 .map(|(_, mutex)| mutex.into_inner().unwrap_or_else(|e| e.into_inner()))
142 }
143
144 /// Check if connection state exists
145 pub fn contains(&self, key: &K) -> bool {
146 self.connections.contains_key(key)
147 }
148
149 /// Get number of tracked connections
150 pub fn len(&self) -> usize {
151 self.connections.len()
152 }
153
154 /// Check if cache is empty
155 pub fn is_empty(&self) -> bool {
156 self.connections.is_empty()
157 }
158}
159
160impl<K: Hash + Eq + Clone> Default for H2SessionCache<K> {
161 fn default() -> Self {
162 Self::new()
163 }
164}