Skip to main content

h2session/
lib.rs

1#![warn(missing_docs)]
2//! Stateful HTTP/2 frame parser with HPACK support for passive traffic
3//! monitoring.
4//!
5//! This crate decodes HTTP/2 frames and HPACK-compressed headers from raw
6//! byte streams, maintaining per-connection decoder state so that dynamic
7//! table entries are preserved across successive `feed()` calls.
8//!
9//! # Key types
10//!
11//! - [`H2SessionCache`] — thread-safe cache of many connections keyed by an
12//!   arbitrary `K`. Best when you have many connections and want automatic
13//!   state management.
14//! - [`H2ConnectionState`] — state for a single HTTP/2 connection. Use
15//!   [`feed()`](H2ConnectionState::feed) to push data incrementally and
16//!   [`try_pop()`](H2ConnectionState::try_pop) to retrieve completed messages.
17//!
18//! # Examples
19//!
20//! ## Multi-connection cache
21//!
22//! ```no_run
23//! use h2session::{H2SessionCache, ParsedH2Message};
24//!
25//! let cache = H2SessionCache::<u64>::new();
26//!
27//! // Parse a buffer for connection 42
28//! let completed = cache.parse(42, &raw_bytes).unwrap();
29//! for (stream_id, msg) in &completed {
30//!     if msg.is_request() {
31//!         println!(
32//!             "{} {}",
33//!             msg.method.as_deref().unwrap_or("?"),
34//!             msg.path.as_deref().unwrap_or("/")
35//!         );
36//!     }
37//! }
38//! # let raw_bytes: Vec<u8> = vec![];
39//! ```
40//!
41//! ## Single-connection incremental parsing
42//!
43//! ```no_run
44//! use h2session::{H2ConnectionState, TimestampNs};
45//!
46//! let mut state = H2ConnectionState::new();
47//!
48//! // Feed data as it arrives
49//! state.feed(&chunk, TimestampNs(0)).unwrap();
50//!
51//! // Pop completed messages
52//! while let Some((stream_id, msg)) = state.try_pop() {
53//!     println!("stream {stream_id}: request={}", msg.is_request());
54//! }
55//! # let chunk: Vec<u8> = vec![];
56//! ```
57//!
58//! # Feature flags
59//!
60//! - **`tracing`** — emit `tracing::warn!` events for non-fatal parse issues
61//!   (stale stream eviction, etc.)
62
63mod frame;
64mod http_types;
65mod parse;
66mod state;
67
68#[cfg(test)]
69mod tests;
70
71#[cfg(feature = "tracing")]
72macro_rules! trace_warn {
73    ($($arg:tt)*) => { ::tracing::warn!($($arg)*) }
74}
75#[cfg(not(feature = "tracing"))]
76macro_rules! trace_warn {
77    ($($arg:tt)*) => {};
78}
79use std::{collections::HashMap, hash::Hash, sync::Mutex};
80
81// Public re-exports for direct state management
82use dashmap::DashMap;
83pub use frame::{CONNECTION_PREFACE, is_http2_preface, looks_like_http2_frame};
84pub use http_types::{HttpRequest, HttpResponse};
85pub use state::{
86    H2ConnectionState,
87    H2Limits,
88    ParseError,
89    ParseErrorKind,
90    ParsedH2Message,
91    StreamId,
92    TimestampNs,
93};
94pub(crate) use trace_warn;
95
96/// HTTP/2 session cache with generic connection keys.
97///
98/// Uses `DashMap<K, Mutex<H2ConnectionState>>` to provide per-key
99/// serialization. The DashMap shard lock is held only briefly (to look up or
100/// insert the entry), while the per-key Mutex serializes concurrent same-key
101/// calls to `parse()`. This prevents the remove-and-reinsert race where two
102/// threads would both create default state for the same key, losing one
103/// thread's HPACK table.
104pub struct H2SessionCache<K> {
105    connections: DashMap<K, Mutex<H2ConnectionState>>,
106}
107
108impl<K: Hash + Eq + Clone> H2SessionCache<K> {
109    /// Create a new cache
110    pub fn new() -> Self {
111        Self {
112            connections: DashMap::new(),
113        }
114    }
115
116    /// Parse buffer with connection state
117    ///
118    /// If the connection key doesn't exist, creates new state automatically.
119    /// Returns completed HTTP/2 messages indexed by stream_id. The map may be
120    /// empty if no streams completed yet — this is not an error.
121    pub fn parse(
122        &self,
123        key: K,
124        buffer: &[u8],
125    ) -> Result<HashMap<StreamId, ParsedH2Message>, ParseError> {
126        // Atomic insert-if-absent
127        self.connections
128            .entry(key.clone())
129            .or_insert_with(|| Mutex::new(H2ConnectionState::default()));
130
131        // Get shared shard read lock + per-key mutex lock
132        let entry = self.connections.get(&key).expect("entry was just ensured");
133        let mut state = entry.lock().unwrap_or_else(|e| e.into_inner());
134        parse::parse_frames_stateful(buffer, &mut state)
135    }
136
137    /// Remove connection state (call when connection closes)
138    pub fn remove(&self, key: &K) -> Option<H2ConnectionState> {
139        self.connections
140            .remove(key)
141            .map(|(_, mutex)| mutex.into_inner().unwrap_or_else(|e| e.into_inner()))
142    }
143
144    /// Check if connection state exists
145    pub fn contains(&self, key: &K) -> bool {
146        self.connections.contains_key(key)
147    }
148
149    /// Get number of tracked connections
150    pub fn len(&self) -> usize {
151        self.connections.len()
152    }
153
154    /// Check if cache is empty
155    pub fn is_empty(&self) -> bool {
156        self.connections.is_empty()
157    }
158}
159
160impl<K: Hash + Eq + Clone> Default for H2SessionCache<K> {
161    fn default() -> Self {
162        Self::new()
163    }
164}