pcapsql_core/stream/parsers/
http2.rs

1//! HTTP/2 frame parser for decrypted TLS streams.
2//!
3//! Implements RFC 7540 frame parsing with HPACK header decompression.
4//! This parser processes decrypted TLS application data containing HTTP/2 frames.
5
6use std::collections::HashMap;
7use std::sync::{Arc, Mutex};
8
9use bytes::{Buf, BytesMut};
10use compact_str::CompactString;
11use hpack::Decoder as HpackDecoder;
12
13use crate::protocol::{FieldValue, OwnedFieldValue};
14use crate::schema::{DataKind, FieldDescriptor};
15use crate::stream::{Direction, ParsedMessage, StreamContext, StreamParseResult, StreamParser};
16
17/// HTTP/2 connection preface: "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
18pub const CONNECTION_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
19
20/// HTTP/2 frame types (RFC 7540 Section 6)
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum FrameType {
23    Data,
24    Headers,
25    Priority,
26    RstStream,
27    Settings,
28    PushPromise,
29    Ping,
30    GoAway,
31    WindowUpdate,
32    Continuation,
33    Unknown(u8),
34}
35
36impl From<u8> for FrameType {
37    fn from(v: u8) -> Self {
38        match v {
39            0x0 => FrameType::Data,
40            0x1 => FrameType::Headers,
41            0x2 => FrameType::Priority,
42            0x3 => FrameType::RstStream,
43            0x4 => FrameType::Settings,
44            0x5 => FrameType::PushPromise,
45            0x6 => FrameType::Ping,
46            0x7 => FrameType::GoAway,
47            0x8 => FrameType::WindowUpdate,
48            0x9 => FrameType::Continuation,
49            other => FrameType::Unknown(other),
50        }
51    }
52}
53
54impl FrameType {
55    /// Get string name for the frame type
56    pub fn as_str(&self) -> &'static str {
57        match self {
58            FrameType::Data => "DATA",
59            FrameType::Headers => "HEADERS",
60            FrameType::Priority => "PRIORITY",
61            FrameType::RstStream => "RST_STREAM",
62            FrameType::Settings => "SETTINGS",
63            FrameType::PushPromise => "PUSH_PROMISE",
64            FrameType::Ping => "PING",
65            FrameType::GoAway => "GOAWAY",
66            FrameType::WindowUpdate => "WINDOW_UPDATE",
67            FrameType::Continuation => "CONTINUATION",
68            FrameType::Unknown(_) => "UNKNOWN",
69        }
70    }
71}
72
73/// HTTP/2 frame flags
74pub mod flags {
75    pub const END_STREAM: u8 = 0x1;
76    pub const END_HEADERS: u8 = 0x4;
77    pub const PADDED: u8 = 0x8;
78    pub const PRIORITY: u8 = 0x20;
79    pub const ACK: u8 = 0x1;
80}
81
82/// HTTP/2 error codes (RFC 7540 Section 7)
83pub mod error_codes {
84    pub const NO_ERROR: u32 = 0x0;
85    pub const PROTOCOL_ERROR: u32 = 0x1;
86    pub const INTERNAL_ERROR: u32 = 0x2;
87    pub const FLOW_CONTROL_ERROR: u32 = 0x3;
88    pub const SETTINGS_TIMEOUT: u32 = 0x4;
89    pub const STREAM_CLOSED: u32 = 0x5;
90    pub const FRAME_SIZE_ERROR: u32 = 0x6;
91    pub const REFUSED_STREAM: u32 = 0x7;
92    pub const CANCEL: u32 = 0x8;
93    pub const COMPRESSION_ERROR: u32 = 0x9;
94    pub const CONNECT_ERROR: u32 = 0xa;
95    pub const ENHANCE_YOUR_CALM: u32 = 0xb;
96    pub const INADEQUATE_SECURITY: u32 = 0xc;
97    pub const HTTP_1_1_REQUIRED: u32 = 0xd;
98
99    pub fn name(code: u32) -> &'static str {
100        match code {
101            NO_ERROR => "NO_ERROR",
102            PROTOCOL_ERROR => "PROTOCOL_ERROR",
103            INTERNAL_ERROR => "INTERNAL_ERROR",
104            FLOW_CONTROL_ERROR => "FLOW_CONTROL_ERROR",
105            SETTINGS_TIMEOUT => "SETTINGS_TIMEOUT",
106            STREAM_CLOSED => "STREAM_CLOSED",
107            FRAME_SIZE_ERROR => "FRAME_SIZE_ERROR",
108            REFUSED_STREAM => "REFUSED_STREAM",
109            CANCEL => "CANCEL",
110            COMPRESSION_ERROR => "COMPRESSION_ERROR",
111            CONNECT_ERROR => "CONNECT_ERROR",
112            ENHANCE_YOUR_CALM => "ENHANCE_YOUR_CALM",
113            INADEQUATE_SECURITY => "INADEQUATE_SECURITY",
114            HTTP_1_1_REQUIRED => "HTTP_1_1_REQUIRED",
115            _ => "UNKNOWN",
116        }
117    }
118}
119
120/// HTTP/2 settings identifiers (RFC 7540 Section 6.5.2)
121pub mod settings {
122    pub const HEADER_TABLE_SIZE: u16 = 0x1;
123    pub const ENABLE_PUSH: u16 = 0x2;
124    pub const MAX_CONCURRENT_STREAMS: u16 = 0x3;
125    pub const INITIAL_WINDOW_SIZE: u16 = 0x4;
126    pub const MAX_FRAME_SIZE: u16 = 0x5;
127    pub const MAX_HEADER_LIST_SIZE: u16 = 0x6;
128
129    pub fn name(id: u16) -> &'static str {
130        match id {
131            HEADER_TABLE_SIZE => "HEADER_TABLE_SIZE",
132            ENABLE_PUSH => "ENABLE_PUSH",
133            MAX_CONCURRENT_STREAMS => "MAX_CONCURRENT_STREAMS",
134            INITIAL_WINDOW_SIZE => "INITIAL_WINDOW_SIZE",
135            MAX_FRAME_SIZE => "MAX_FRAME_SIZE",
136            MAX_HEADER_LIST_SIZE => "MAX_HEADER_LIST_SIZE",
137            _ => "UNKNOWN",
138        }
139    }
140}
141
142/// HTTP/2 frame header (9 bytes)
143#[derive(Debug, Clone)]
144pub struct FrameHeader {
145    pub length: u32,
146    pub frame_type: FrameType,
147    pub flags: u8,
148    pub stream_id: u32,
149}
150
151impl FrameHeader {
152    pub const SIZE: usize = 9;
153
154    pub fn parse(data: &[u8]) -> Option<Self> {
155        if data.len() < Self::SIZE {
156            return None;
157        }
158
159        let length = ((data[0] as u32) << 16) | ((data[1] as u32) << 8) | (data[2] as u32);
160        let frame_type = FrameType::from(data[3]);
161        let flags = data[4];
162        let stream_id = ((data[5] as u32 & 0x7F) << 24)
163            | ((data[6] as u32) << 16)
164            | ((data[7] as u32) << 8)
165            | (data[8] as u32);
166
167        Some(FrameHeader {
168            length,
169            frame_type,
170            flags,
171            stream_id,
172        })
173    }
174
175    /// Check if END_STREAM flag is set
176    pub fn is_end_stream(&self) -> bool {
177        self.flags & flags::END_STREAM != 0
178    }
179
180    /// Check if END_HEADERS flag is set
181    pub fn is_end_headers(&self) -> bool {
182        self.flags & flags::END_HEADERS != 0
183    }
184
185    /// Check if PADDED flag is set
186    pub fn is_padded(&self) -> bool {
187        self.flags & flags::PADDED != 0
188    }
189
190    /// Check if PRIORITY flag is set
191    pub fn is_priority(&self) -> bool {
192        self.flags & flags::PRIORITY != 0
193    }
194
195    /// Check if ACK flag is set
196    pub fn is_ack(&self) -> bool {
197        self.flags & flags::ACK != 0
198    }
199}
200
201/// Priority data for HEADERS and PRIORITY frames
202#[derive(Debug, Clone)]
203#[allow(dead_code)]
204pub struct PriorityData {
205    pub exclusive: bool,
206    pub stream_dependency: u32,
207    pub weight: u8,
208}
209
210/// HTTP/2 stream state (RFC 7540 Section 5.1)
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum StreamState {
213    Idle,
214    Open,
215    #[allow(dead_code)] // RFC 7540 state - not yet implemented
216    HalfClosedLocal,
217    HalfClosedRemote,
218    Closed,
219}
220
221impl StreamState {
222    pub fn as_str(&self) -> &'static str {
223        match self {
224            StreamState::Idle => "idle",
225            StreamState::Open => "open",
226            StreamState::HalfClosedLocal => "half-closed (local)",
227            StreamState::HalfClosedRemote => "half-closed (remote)",
228            StreamState::Closed => "closed",
229        }
230    }
231}
232
233/// HTTP/2 stream tracking
234#[derive(Debug, Clone)]
235pub struct Http2Stream {
236    #[allow(dead_code)] // Used in Debug output and future features
237    pub stream_id: u32,
238    pub state: StreamState,
239
240    // Request info
241    pub method: Option<String>,
242    pub path: Option<String>,
243    pub authority: Option<String>,
244    pub scheme: Option<String>,
245    pub request_headers: Vec<(String, String)>,
246    pub request_body_len: usize,
247
248    // Response info
249    pub status: Option<u16>,
250    pub response_headers: Vec<(String, String)>,
251    pub response_body_len: usize,
252
253    // Frame tracking
254    pub request_frame: Option<u64>,
255    pub response_frame: Option<u64>,
256}
257
258impl Http2Stream {
259    pub fn new(stream_id: u32) -> Self {
260        Self {
261            stream_id,
262            state: StreamState::Idle,
263            method: None,
264            path: None,
265            authority: None,
266            scheme: None,
267            request_headers: Vec::new(),
268            request_body_len: 0,
269            status: None,
270            response_headers: Vec::new(),
271            response_body_len: 0,
272            request_frame: None,
273            response_frame: None,
274        }
275    }
276
277    /// Extract common header value
278    #[allow(dead_code)]
279    pub fn get_header(&self, name: &str) -> Option<&str> {
280        // Check request headers first, then response
281        for (n, v) in &self.request_headers {
282            if n.eq_ignore_ascii_case(name) {
283                return Some(v);
284            }
285        }
286        for (n, v) in &self.response_headers {
287            if n.eq_ignore_ascii_case(name) {
288                return Some(v);
289            }
290        }
291        None
292    }
293}
294
295/// Per-connection HTTP/2 state
296struct Http2ConnectionState {
297    /// HPACK decoder (direction-aware)
298    client_decoder: HpackDecoder<'static>,
299    server_decoder: HpackDecoder<'static>,
300
301    /// Streams indexed by stream_id
302    streams: HashMap<u32, Http2Stream>,
303
304    /// Continuation state: (stream_id, accumulated header block, direction)
305    continuation: Option<(u32, Vec<u8>, Direction)>,
306
307    /// Buffer for incomplete frames
308    buffer: BytesMut,
309
310    /// Whether we've seen the client preface
311    client_preface_seen: bool,
312
313    /// Whether we've seen the server preface (SETTINGS frame)
314    server_preface_seen: bool,
315}
316
317impl Http2ConnectionState {
318    fn new() -> Self {
319        Self {
320            client_decoder: HpackDecoder::new(),
321            server_decoder: HpackDecoder::new(),
322            streams: HashMap::new(),
323            continuation: None,
324            buffer: BytesMut::new(),
325            client_preface_seen: false,
326            server_preface_seen: false,
327        }
328    }
329
330    fn get_decoder(&mut self, direction: Direction) -> &mut HpackDecoder<'static> {
331        match direction {
332            Direction::ToServer => &mut self.client_decoder,
333            Direction::ToClient => &mut self.server_decoder,
334        }
335    }
336}
337
338/// HTTP/2 stream parser
339pub struct Http2StreamParser {
340    /// Per-connection state
341    connections: Arc<Mutex<HashMap<u64, Http2ConnectionState>>>,
342}
343
344impl Http2StreamParser {
345    pub fn new() -> Self {
346        Self {
347            connections: Arc::new(Mutex::new(HashMap::new())),
348        }
349    }
350
351    /// Remove state for a closed connection
352    pub fn remove_connection(&self, connection_id: u64) {
353        let mut connections = self.connections.lock().unwrap();
354        connections.remove(&connection_id);
355    }
356
357    /// Parse a single frame and return fields
358    fn parse_frame(
359        state: &mut Http2ConnectionState,
360        header: &FrameHeader,
361        payload: &[u8],
362        direction: Direction,
363        frame_number: u64,
364    ) -> HashMap<&'static str, OwnedFieldValue> {
365        let mut fields = HashMap::new();
366
367        fields.insert("frame_type", FieldValue::Str(header.frame_type.as_str()));
368        fields.insert("stream_id", FieldValue::UInt32(header.stream_id));
369        fields.insert("flags", FieldValue::UInt8(header.flags));
370        fields.insert("length", FieldValue::UInt32(header.length));
371
372        match header.frame_type {
373            FrameType::Data => {
374                Self::parse_data_frame(
375                    state,
376                    header,
377                    payload,
378                    direction,
379                    frame_number,
380                    &mut fields,
381                );
382            }
383            FrameType::Headers => {
384                Self::parse_headers_frame(
385                    state,
386                    header,
387                    payload,
388                    direction,
389                    frame_number,
390                    &mut fields,
391                );
392            }
393            FrameType::Priority => {
394                Self::parse_priority_frame(payload, &mut fields);
395            }
396            FrameType::RstStream => {
397                Self::parse_rst_stream_frame(state, header, payload, &mut fields);
398            }
399            FrameType::Settings => {
400                Self::parse_settings_frame(header, payload, &mut fields);
401            }
402            FrameType::PushPromise => {
403                Self::parse_push_promise_frame(state, header, payload, direction, &mut fields);
404            }
405            FrameType::Ping => {
406                Self::parse_ping_frame(header, payload, &mut fields);
407            }
408            FrameType::GoAway => {
409                Self::parse_goaway_frame(payload, &mut fields);
410            }
411            FrameType::WindowUpdate => {
412                Self::parse_window_update_frame(payload, &mut fields);
413            }
414            FrameType::Continuation => {
415                Self::parse_continuation_frame(state, header, payload, direction, &mut fields);
416            }
417            FrameType::Unknown(t) => {
418                fields.insert("unknown_type", FieldValue::UInt8(t));
419            }
420        }
421
422        // Add stream state if available
423        if header.stream_id != 0 {
424            if let Some(stream) = state.streams.get(&header.stream_id) {
425                fields.insert("stream_state", FieldValue::Str(stream.state.as_str()));
426            }
427        }
428
429        fields
430    }
431
432    fn parse_data_frame(
433        state: &mut Http2ConnectionState,
434        header: &FrameHeader,
435        payload: &[u8],
436        direction: Direction,
437        _frame_number: u64,
438        fields: &mut HashMap<&'static str, OwnedFieldValue>,
439    ) {
440        let (data, pad_len) = if header.is_padded() && !payload.is_empty() {
441            let pad_len = payload[0] as usize;
442            if pad_len < payload.len() {
443                (&payload[1..payload.len() - pad_len], pad_len)
444            } else {
445                (&payload[1..], 0)
446            }
447        } else {
448            (payload, 0)
449        };
450
451        fields.insert("data_length", FieldValue::UInt64(data.len() as u64));
452        if pad_len > 0 {
453            fields.insert("padding_length", FieldValue::UInt8(pad_len as u8));
454        }
455        fields.insert("end_stream", FieldValue::Bool(header.is_end_stream()));
456
457        // Update stream state
458        let stream = state
459            .streams
460            .entry(header.stream_id)
461            .or_insert_with(|| Http2Stream::new(header.stream_id));
462
463        match direction {
464            Direction::ToServer => {
465                stream.request_body_len += data.len();
466            }
467            Direction::ToClient => {
468                stream.response_body_len += data.len();
469            }
470        }
471
472        if header.is_end_stream() {
473            stream.state = match stream.state {
474                StreamState::Open => StreamState::HalfClosedRemote,
475                StreamState::HalfClosedLocal => StreamState::Closed,
476                _ => StreamState::Closed,
477            };
478        }
479    }
480
481    fn parse_headers_frame(
482        state: &mut Http2ConnectionState,
483        header: &FrameHeader,
484        payload: &[u8],
485        direction: Direction,
486        frame_number: u64,
487        fields: &mut HashMap<&'static str, OwnedFieldValue>,
488    ) {
489        let mut offset = 0;
490        let mut pad_len = 0;
491
492        if header.is_padded() && !payload.is_empty() {
493            pad_len = payload[0] as usize;
494            offset += 1;
495        }
496
497        // Parse priority if present
498        if header.is_priority() && payload.len() >= offset + 5 {
499            let dep_bytes = &payload[offset..offset + 4];
500            let dep = u32::from_be_bytes([
501                dep_bytes[0] & 0x7F,
502                dep_bytes[1],
503                dep_bytes[2],
504                dep_bytes[3],
505            ]);
506            let exclusive = dep_bytes[0] & 0x80 != 0;
507            let weight = payload[offset + 4];
508            offset += 5;
509
510            fields.insert("priority_exclusive", FieldValue::Bool(exclusive));
511            fields.insert("priority_dependency", FieldValue::UInt32(dep));
512            fields.insert("priority_weight", FieldValue::UInt8(weight));
513        }
514
515        let header_block_end = payload.len().saturating_sub(pad_len);
516        let header_block = &payload[offset.min(header_block_end)..header_block_end];
517
518        fields.insert("end_stream", FieldValue::Bool(header.is_end_stream()));
519        fields.insert("end_headers", FieldValue::Bool(header.is_end_headers()));
520
521        // Decode headers first if complete
522        let decoded_headers = if header.is_end_headers() {
523            let decoder = state.get_decoder(direction);
524            decoder.decode(header_block).ok()
525        } else {
526            // Start continuation
527            state.continuation = Some((header.stream_id, header_block.to_vec(), direction));
528            None
529        };
530
531        // Now get or create stream
532        let stream = state
533            .streams
534            .entry(header.stream_id)
535            .or_insert_with(|| Http2Stream::new(header.stream_id));
536
537        if stream.state == StreamState::Idle {
538            stream.state = StreamState::Open;
539            stream.request_frame = Some(frame_number);
540        }
541
542        // Process decoded headers if available
543        if let Some(headers) = decoded_headers {
544            Self::process_headers(stream, &headers, direction, frame_number, fields);
545        }
546
547        if header.is_end_stream() {
548            stream.state = match stream.state {
549                StreamState::Open => StreamState::HalfClosedRemote,
550                StreamState::HalfClosedLocal => StreamState::Closed,
551                _ => StreamState::Closed,
552            };
553        }
554    }
555
556    fn process_headers(
557        stream: &mut Http2Stream,
558        headers: &[(Vec<u8>, Vec<u8>)],
559        direction: Direction,
560        frame_number: u64,
561        fields: &mut HashMap<&'static str, OwnedFieldValue>,
562    ) {
563        let mut header_strs = Vec::new();
564
565        for (name, value) in headers {
566            let name_str = String::from_utf8_lossy(name).to_string();
567            let value_str = String::from_utf8_lossy(value).to_string();
568
569            // Extract pseudo-headers
570            match name_str.as_str() {
571                ":method" => {
572                    stream.method = Some(value_str.clone());
573                    fields.insert(
574                        "method",
575                        FieldValue::OwnedString(CompactString::new(&value_str)),
576                    );
577                }
578                ":path" => {
579                    stream.path = Some(value_str.clone());
580                    fields.insert(
581                        "path",
582                        FieldValue::OwnedString(CompactString::new(&value_str)),
583                    );
584                }
585                ":authority" => {
586                    stream.authority = Some(value_str.clone());
587                    fields.insert(
588                        "authority",
589                        FieldValue::OwnedString(CompactString::new(&value_str)),
590                    );
591                }
592                ":scheme" => {
593                    stream.scheme = Some(value_str.clone());
594                    fields.insert(
595                        "scheme",
596                        FieldValue::OwnedString(CompactString::new(&value_str)),
597                    );
598                }
599                ":status" => {
600                    if let Ok(status) = value_str.parse::<u16>() {
601                        stream.status = Some(status);
602                        stream.response_frame = Some(frame_number);
603                        fields.insert("status", FieldValue::UInt16(status));
604                    }
605                }
606                "content-type" => {
607                    fields.insert(
608                        "content_type",
609                        FieldValue::OwnedString(CompactString::new(&value_str)),
610                    );
611                }
612                "content-length" => {
613                    if let Ok(len) = value_str.parse::<u64>() {
614                        fields.insert("content_length", FieldValue::UInt64(len));
615                    }
616                }
617                "user-agent" => {
618                    fields.insert(
619                        "user_agent",
620                        FieldValue::OwnedString(CompactString::new(&value_str)),
621                    );
622                }
623                _ => {}
624            }
625
626            // Store in appropriate list
627            if direction == Direction::ToServer || stream.status.is_none() {
628                stream
629                    .request_headers
630                    .push((name_str.clone(), value_str.clone()));
631            } else {
632                stream
633                    .response_headers
634                    .push((name_str.clone(), value_str.clone()));
635            }
636
637            header_strs.push(format!("{name_str}: {value_str}"));
638        }
639
640        // Store all headers as a semicolon-separated string
641        if !header_strs.is_empty() {
642            let headers_str = header_strs.join("; ");
643            if direction == Direction::ToServer || stream.status.is_none() {
644                fields.insert(
645                    "request_headers",
646                    FieldValue::OwnedString(CompactString::new(&headers_str)),
647                );
648            } else {
649                fields.insert(
650                    "response_headers",
651                    FieldValue::OwnedString(CompactString::new(&headers_str)),
652                );
653            }
654        }
655    }
656
657    fn parse_priority_frame(payload: &[u8], fields: &mut HashMap<&'static str, OwnedFieldValue>) {
658        if payload.len() >= 5 {
659            let dep = u32::from_be_bytes([payload[0] & 0x7F, payload[1], payload[2], payload[3]]);
660            let exclusive = payload[0] & 0x80 != 0;
661            let weight = payload[4];
662
663            fields.insert("priority_exclusive", FieldValue::Bool(exclusive));
664            fields.insert("priority_dependency", FieldValue::UInt32(dep));
665            fields.insert("priority_weight", FieldValue::UInt8(weight));
666        }
667    }
668
669    fn parse_rst_stream_frame(
670        state: &mut Http2ConnectionState,
671        header: &FrameHeader,
672        payload: &[u8],
673        fields: &mut HashMap<&'static str, OwnedFieldValue>,
674    ) {
675        if payload.len() >= 4 {
676            let error_code = u32::from_be_bytes([payload[0], payload[1], payload[2], payload[3]]);
677            fields.insert("error_code", FieldValue::UInt32(error_code));
678            fields.insert("error_name", FieldValue::Str(error_codes::name(error_code)));
679        }
680
681        // Update stream state
682        if let Some(stream) = state.streams.get_mut(&header.stream_id) {
683            stream.state = StreamState::Closed;
684        }
685    }
686
687    fn parse_settings_frame(
688        header: &FrameHeader,
689        payload: &[u8],
690        fields: &mut HashMap<&'static str, OwnedFieldValue>,
691    ) {
692        fields.insert("ack", FieldValue::Bool(header.is_ack()));
693
694        if !header.is_ack() {
695            let mut settings_strs = Vec::new();
696            let mut pos = 0;
697            while pos + 6 <= payload.len() {
698                let id = u16::from_be_bytes([payload[pos], payload[pos + 1]]);
699                let value = u32::from_be_bytes([
700                    payload[pos + 2],
701                    payload[pos + 3],
702                    payload[pos + 4],
703                    payload[pos + 5],
704                ]);
705                pos += 6;
706
707                settings_strs.push(format!("{}={}", settings::name(id), value));
708
709                // Store specific settings as individual fields
710                match id {
711                    settings::HEADER_TABLE_SIZE => {
712                        fields.insert("header_table_size", FieldValue::UInt32(value));
713                    }
714                    settings::MAX_CONCURRENT_STREAMS => {
715                        fields.insert("max_concurrent_streams", FieldValue::UInt32(value));
716                    }
717                    settings::INITIAL_WINDOW_SIZE => {
718                        fields.insert("initial_window_size", FieldValue::UInt32(value));
719                    }
720                    settings::MAX_FRAME_SIZE => {
721                        fields.insert("max_frame_size", FieldValue::UInt32(value));
722                    }
723                    _ => {}
724                }
725            }
726
727            if !settings_strs.is_empty() {
728                fields.insert(
729                    "settings",
730                    FieldValue::OwnedString(CompactString::new(settings_strs.join(", "))),
731                );
732            }
733        }
734    }
735
736    fn parse_push_promise_frame(
737        state: &mut Http2ConnectionState,
738        header: &FrameHeader,
739        payload: &[u8],
740        direction: Direction,
741        fields: &mut HashMap<&'static str, OwnedFieldValue>,
742    ) {
743        let mut offset = 0;
744        let mut pad_len = 0;
745
746        if header.is_padded() && !payload.is_empty() {
747            pad_len = payload[0] as usize;
748            offset += 1;
749        }
750
751        if payload.len() >= offset + 4 {
752            let promised_stream_id = u32::from_be_bytes([
753                payload[offset] & 0x7F,
754                payload[offset + 1],
755                payload[offset + 2],
756                payload[offset + 3],
757            ]);
758            offset += 4;
759
760            fields.insert("promised_stream_id", FieldValue::UInt32(promised_stream_id));
761
762            let header_block_end = payload.len().saturating_sub(pad_len);
763            let header_block = &payload[offset.min(header_block_end)..header_block_end];
764
765            if header.is_end_headers() {
766                let decoder = state.get_decoder(direction);
767                if let Ok(headers) = decoder.decode(header_block) {
768                    let stream = state
769                        .streams
770                        .entry(promised_stream_id)
771                        .or_insert_with(|| Http2Stream::new(promised_stream_id));
772                    Self::process_headers(stream, &headers, direction, 0, fields);
773                }
774            }
775        }
776    }
777
778    fn parse_ping_frame(
779        header: &FrameHeader,
780        payload: &[u8],
781        fields: &mut HashMap<&'static str, OwnedFieldValue>,
782    ) {
783        fields.insert("ack", FieldValue::Bool(header.is_ack()));
784
785        if payload.len() >= 8 {
786            let mut data = [0u8; 8];
787            data.copy_from_slice(&payload[..8]);
788            fields.insert("ping_data", FieldValue::OwnedBytes(data.to_vec()));
789        }
790    }
791
792    fn parse_goaway_frame(payload: &[u8], fields: &mut HashMap<&'static str, OwnedFieldValue>) {
793        if payload.len() >= 8 {
794            let last_stream_id =
795                u32::from_be_bytes([payload[0] & 0x7F, payload[1], payload[2], payload[3]]);
796            let error_code = u32::from_be_bytes([payload[4], payload[5], payload[6], payload[7]]);
797
798            fields.insert("last_stream_id", FieldValue::UInt32(last_stream_id));
799            fields.insert("error_code", FieldValue::UInt32(error_code));
800            fields.insert("error_name", FieldValue::Str(error_codes::name(error_code)));
801
802            if payload.len() > 8 {
803                let debug_data = String::from_utf8_lossy(&payload[8..]).to_string();
804                fields.insert(
805                    "debug_data",
806                    FieldValue::OwnedString(CompactString::new(&debug_data)),
807                );
808            }
809        }
810    }
811
812    fn parse_window_update_frame(
813        payload: &[u8],
814        fields: &mut HashMap<&'static str, OwnedFieldValue>,
815    ) {
816        if payload.len() >= 4 {
817            let increment =
818                u32::from_be_bytes([payload[0] & 0x7F, payload[1], payload[2], payload[3]]);
819            fields.insert("window_increment", FieldValue::UInt32(increment));
820        }
821    }
822
823    fn parse_continuation_frame(
824        state: &mut Http2ConnectionState,
825        header: &FrameHeader,
826        payload: &[u8],
827        direction: Direction,
828        fields: &mut HashMap<&'static str, OwnedFieldValue>,
829    ) {
830        fields.insert("end_headers", FieldValue::Bool(header.is_end_headers()));
831
832        // Accumulate header block
833        if let Some((stream_id, ref mut block, saved_dir)) = state.continuation.take() {
834            if stream_id == header.stream_id && saved_dir == direction {
835                block.extend_from_slice(payload);
836
837                if header.is_end_headers() {
838                    // Decode complete header block
839                    let decoder = state.get_decoder(direction);
840                    if let Ok(headers) = decoder.decode(block) {
841                        let stream = state
842                            .streams
843                            .entry(header.stream_id)
844                            .or_insert_with(|| Http2Stream::new(header.stream_id));
845                        Self::process_headers(stream, &headers, direction, 0, fields);
846                    }
847                } else {
848                    // More continuation frames expected
849                    state.continuation = Some((stream_id, block.clone(), saved_dir));
850                }
851            }
852        }
853    }
854}
855
856impl Default for Http2StreamParser {
857    fn default() -> Self {
858        Self::new()
859    }
860}
861
862impl StreamParser for Http2StreamParser {
863    fn name(&self) -> &'static str {
864        "http2"
865    }
866
867    fn display_name(&self) -> &'static str {
868        "HTTP/2"
869    }
870
871    fn can_parse_stream(&self, context: &StreamContext) -> bool {
872        // HTTP/2 is typically used with ALPN "h2" or on decrypted TLS streams
873        if let Some(ref alpn) = context.alpn {
874            return alpn == "h2" || alpn == "h2c";
875        }
876        false
877    }
878
879    fn parse_stream(&self, data: &[u8], context: &StreamContext) -> StreamParseResult {
880        let mut connections = self.connections.lock().unwrap();
881        let state = connections
882            .entry(context.connection_id)
883            .or_insert_with(Http2ConnectionState::new);
884
885        // Track buffer size before adding new data
886        let buffer_len_before = state.buffer.len();
887
888        // Append new data to buffer
889        state.buffer.extend_from_slice(data);
890
891        // Check for connection preface (client side)
892        if context.direction == Direction::ToServer && !state.client_preface_seen {
893            if state.buffer.len() >= CONNECTION_PREFACE.len() {
894                if &state.buffer[..CONNECTION_PREFACE.len()] == CONNECTION_PREFACE {
895                    state.buffer.advance(CONNECTION_PREFACE.len());
896                    state.client_preface_seen = true;
897                } else {
898                    // Not HTTP/2
899                    return StreamParseResult::NotThisProtocol;
900                }
901            } else {
902                // Need more data
903                return StreamParseResult::NeedMore {
904                    minimum_bytes: Some(CONNECTION_PREFACE.len()),
905                };
906            }
907        }
908
909        let mut messages = Vec::new();
910        // Calculate bytes consumed: (old buffer + new data) - remaining buffer
911        // This is computed at the end after processing frames
912        let total_input = buffer_len_before + data.len();
913
914        // Parse complete frames
915        loop {
916            if state.buffer.len() < FrameHeader::SIZE {
917                break; // Need more data for header
918            }
919
920            let header = FrameHeader::parse(&state.buffer).unwrap();
921            let total_frame_len = FrameHeader::SIZE + header.length as usize;
922
923            if state.buffer.len() < total_frame_len {
924                break; // Need more data for payload
925            }
926
927            let payload = state.buffer[FrameHeader::SIZE..total_frame_len].to_vec();
928            state.buffer.advance(total_frame_len);
929
930            // Track server preface (first SETTINGS frame from server)
931            if context.direction == Direction::ToClient
932                && !state.server_preface_seen
933                && header.frame_type == FrameType::Settings
934            {
935                state.server_preface_seen = true;
936            }
937
938            // Parse the frame
939            let fields = Self::parse_frame(state, &header, &payload, context.direction, 0);
940
941            let message = ParsedMessage {
942                protocol: "http2",
943                connection_id: context.connection_id,
944                message_id: context.messages_parsed as u32 + messages.len() as u32,
945                direction: context.direction,
946                frame_number: 0, // Will be set by manager
947                fields,
948            };
949            messages.push(message);
950        }
951
952        // Calculate total bytes consumed: (old buffer + new data) - remaining buffer
953        let total_consumed = total_input - state.buffer.len();
954
955        if !messages.is_empty() {
956            StreamParseResult::Complete {
957                messages,
958                bytes_consumed: total_consumed,
959            }
960        } else if total_consumed == 0 {
961            StreamParseResult::NeedMore {
962                minimum_bytes: Some(FrameHeader::SIZE),
963            }
964        } else {
965            StreamParseResult::Complete {
966                messages: vec![],
967                bytes_consumed: total_consumed,
968            }
969        }
970    }
971
972    fn message_schema(&self) -> Vec<FieldDescriptor> {
973        vec![
974            // Frame info
975            FieldDescriptor::new("connection_id", DataKind::UInt64),
976            FieldDescriptor::new("frame_type", DataKind::String),
977            FieldDescriptor::new("stream_id", DataKind::UInt32),
978            FieldDescriptor::new("flags", DataKind::UInt8),
979            FieldDescriptor::new("length", DataKind::UInt32),
980            // Request
981            FieldDescriptor::new("method", DataKind::String).set_nullable(true),
982            FieldDescriptor::new("path", DataKind::String).set_nullable(true),
983            FieldDescriptor::new("authority", DataKind::String).set_nullable(true),
984            FieldDescriptor::new("scheme", DataKind::String).set_nullable(true),
985            // Response
986            FieldDescriptor::new("status", DataKind::UInt16).set_nullable(true),
987            // Headers
988            FieldDescriptor::new("request_headers", DataKind::String).set_nullable(true),
989            FieldDescriptor::new("response_headers", DataKind::String).set_nullable(true),
990            // Common headers
991            FieldDescriptor::new("content_type", DataKind::String).set_nullable(true),
992            FieldDescriptor::new("content_length", DataKind::UInt64).set_nullable(true),
993            FieldDescriptor::new("user_agent", DataKind::String).set_nullable(true),
994            // Data frame
995            FieldDescriptor::new("data_length", DataKind::UInt64).set_nullable(true),
996            FieldDescriptor::new("end_stream", DataKind::Bool).set_nullable(true),
997            FieldDescriptor::new("end_headers", DataKind::Bool).set_nullable(true),
998            // Settings
999            FieldDescriptor::new("settings", DataKind::String).set_nullable(true),
1000            FieldDescriptor::new("ack", DataKind::Bool).set_nullable(true),
1001            // Error handling
1002            FieldDescriptor::new("error_code", DataKind::UInt32).set_nullable(true),
1003            FieldDescriptor::new("error_name", DataKind::String).set_nullable(true),
1004            // GoAway
1005            FieldDescriptor::new("last_stream_id", DataKind::UInt32).set_nullable(true),
1006            FieldDescriptor::new("debug_data", DataKind::String).set_nullable(true),
1007            // Window update
1008            FieldDescriptor::new("window_increment", DataKind::UInt32).set_nullable(true),
1009            // Priority
1010            FieldDescriptor::new("priority_exclusive", DataKind::Bool).set_nullable(true),
1011            FieldDescriptor::new("priority_dependency", DataKind::UInt32).set_nullable(true),
1012            FieldDescriptor::new("priority_weight", DataKind::UInt8).set_nullable(true),
1013            // Push promise
1014            FieldDescriptor::new("promised_stream_id", DataKind::UInt32).set_nullable(true),
1015            // Stream state
1016            FieldDescriptor::new("stream_state", DataKind::String).set_nullable(true),
1017        ]
1018    }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024    use std::net::Ipv4Addr;
1025
1026    fn test_context() -> StreamContext {
1027        StreamContext {
1028            connection_id: 1,
1029            direction: Direction::ToServer,
1030            src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
1031            dst_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
1032            src_port: 54321,
1033            dst_port: 443,
1034            bytes_parsed: 0,
1035            messages_parsed: 0,
1036            alpn: Some("h2".to_string()),
1037        }
1038    }
1039
1040    #[test]
1041    fn test_frame_type_from_u8() {
1042        assert_eq!(FrameType::from(0x0), FrameType::Data);
1043        assert_eq!(FrameType::from(0x1), FrameType::Headers);
1044        assert_eq!(FrameType::from(0x4), FrameType::Settings);
1045        assert_eq!(FrameType::from(0x7), FrameType::GoAway);
1046        assert!(matches!(FrameType::from(0xFF), FrameType::Unknown(0xFF)));
1047    }
1048
1049    #[test]
1050    fn test_frame_type_as_str() {
1051        assert_eq!(FrameType::Data.as_str(), "DATA");
1052        assert_eq!(FrameType::Headers.as_str(), "HEADERS");
1053        assert_eq!(FrameType::Settings.as_str(), "SETTINGS");
1054        assert_eq!(FrameType::Unknown(99).as_str(), "UNKNOWN");
1055    }
1056
1057    #[test]
1058    fn test_frame_header_parse() {
1059        // SETTINGS frame, length=6, no flags, stream 0
1060        let data = [0x00, 0x00, 0x06, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00];
1061        let header = FrameHeader::parse(&data).unwrap();
1062        assert_eq!(header.length, 6);
1063        assert_eq!(header.frame_type, FrameType::Settings);
1064        assert_eq!(header.flags, 0);
1065        assert_eq!(header.stream_id, 0);
1066    }
1067
1068    #[test]
1069    fn test_frame_header_parse_with_stream_id() {
1070        // DATA frame on stream 1
1071        let data = [0x00, 0x00, 0x10, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01];
1072        let header = FrameHeader::parse(&data).unwrap();
1073        assert_eq!(header.length, 16);
1074        assert_eq!(header.frame_type, FrameType::Data);
1075        assert!(header.is_end_stream());
1076        assert_eq!(header.stream_id, 1);
1077    }
1078
1079    #[test]
1080    fn test_frame_header_flags() {
1081        let mut data = [0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01];
1082
1083        // Test END_STREAM
1084        data[4] = flags::END_STREAM;
1085        let header = FrameHeader::parse(&data).unwrap();
1086        assert!(header.is_end_stream());
1087        assert!(!header.is_end_headers());
1088
1089        // Test END_HEADERS
1090        data[4] = flags::END_HEADERS;
1091        let header = FrameHeader::parse(&data).unwrap();
1092        assert!(header.is_end_headers());
1093        assert!(!header.is_end_stream());
1094
1095        // Test PADDED
1096        data[4] = flags::PADDED;
1097        let header = FrameHeader::parse(&data).unwrap();
1098        assert!(header.is_padded());
1099
1100        // Test PRIORITY
1101        data[4] = flags::PRIORITY;
1102        let header = FrameHeader::parse(&data).unwrap();
1103        assert!(header.is_priority());
1104    }
1105
1106    #[test]
1107    fn test_error_code_names() {
1108        assert_eq!(error_codes::name(error_codes::NO_ERROR), "NO_ERROR");
1109        assert_eq!(
1110            error_codes::name(error_codes::PROTOCOL_ERROR),
1111            "PROTOCOL_ERROR"
1112        );
1113        assert_eq!(error_codes::name(error_codes::CANCEL), "CANCEL");
1114        assert_eq!(error_codes::name(0xFFFF), "UNKNOWN");
1115    }
1116
1117    #[test]
1118    fn test_settings_names() {
1119        assert_eq!(
1120            settings::name(settings::HEADER_TABLE_SIZE),
1121            "HEADER_TABLE_SIZE"
1122        );
1123        assert_eq!(settings::name(settings::MAX_FRAME_SIZE), "MAX_FRAME_SIZE");
1124        assert_eq!(settings::name(0xFFFF), "UNKNOWN");
1125    }
1126
1127    #[test]
1128    fn test_http2_stream_state() {
1129        assert_eq!(StreamState::Idle.as_str(), "idle");
1130        assert_eq!(StreamState::Open.as_str(), "open");
1131        assert_eq!(StreamState::Closed.as_str(), "closed");
1132    }
1133
1134    #[test]
1135    fn test_http2_stream_new() {
1136        let stream = Http2Stream::new(1);
1137        assert_eq!(stream.stream_id, 1);
1138        assert_eq!(stream.state, StreamState::Idle);
1139        assert!(stream.method.is_none());
1140        assert!(stream.status.is_none());
1141    }
1142
1143    #[test]
1144    fn test_parser_can_parse_stream() {
1145        let parser = Http2StreamParser::new();
1146
1147        // Should parse with h2 ALPN
1148        let mut ctx = test_context();
1149        ctx.alpn = Some("h2".to_string());
1150        assert!(parser.can_parse_stream(&ctx));
1151
1152        // Should not parse without ALPN
1153        ctx.alpn = None;
1154        assert!(!parser.can_parse_stream(&ctx));
1155
1156        // Should not parse with different ALPN
1157        ctx.alpn = Some("http/1.1".to_string());
1158        assert!(!parser.can_parse_stream(&ctx));
1159    }
1160
1161    #[test]
1162    fn test_parse_connection_preface() {
1163        let ctx = test_context();
1164
1165        // Test 1: Partial preface should need more data
1166        let parser1 = Http2StreamParser::new();
1167        let partial = &CONNECTION_PREFACE[..10];
1168        let result = parser1.parse_stream(partial, &ctx);
1169        assert!(matches!(result, StreamParseResult::NeedMore { .. }));
1170
1171        // Test 2: Full preface followed by SETTINGS should parse (fresh parser)
1172        let parser2 = Http2StreamParser::new();
1173        let mut data = CONNECTION_PREFACE.to_vec();
1174        // SETTINGS frame, length=0, ACK flag
1175        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00]);
1176
1177        let result = parser2.parse_stream(&data, &ctx);
1178        match result {
1179            StreamParseResult::Complete { messages, .. } => {
1180                assert_eq!(messages.len(), 1);
1181                assert_eq!(
1182                    messages[0].fields.get("frame_type"),
1183                    Some(&FieldValue::Str("SETTINGS"))
1184                );
1185            }
1186            _ => panic!("Expected Complete, got {:?}", result),
1187        }
1188    }
1189
1190    #[test]
1191    fn test_parse_settings_frame() {
1192        let parser = Http2StreamParser::new();
1193        let mut ctx = test_context();
1194        ctx.direction = Direction::ToClient; // Server sends first SETTINGS
1195
1196        // SETTINGS frame with HEADER_TABLE_SIZE=4096, MAX_CONCURRENT_STREAMS=100
1197        let frame = [
1198            0x00, 0x00, 0x0c, // length = 12
1199            0x04, // type = SETTINGS
1200            0x00, // flags = 0
1201            0x00, 0x00, 0x00, 0x00, // stream_id = 0
1202            // HEADER_TABLE_SIZE = 4096
1203            0x00, 0x01, 0x00, 0x00, 0x10, 0x00, // MAX_CONCURRENT_STREAMS = 100
1204            0x00, 0x03, 0x00, 0x00, 0x00, 0x64,
1205        ];
1206
1207        let result = parser.parse_stream(&frame, &ctx);
1208        match result {
1209            StreamParseResult::Complete { messages, .. } => {
1210                assert_eq!(messages.len(), 1);
1211                assert_eq!(
1212                    messages[0].fields.get("header_table_size"),
1213                    Some(&FieldValue::UInt32(4096))
1214                );
1215                assert_eq!(
1216                    messages[0].fields.get("max_concurrent_streams"),
1217                    Some(&FieldValue::UInt32(100))
1218                );
1219            }
1220            _ => panic!("Expected Complete"),
1221        }
1222    }
1223
1224    #[test]
1225    fn test_parse_ping_frame() {
1226        let parser = Http2StreamParser::new();
1227        let mut ctx = test_context();
1228        ctx.direction = Direction::ToClient;
1229
1230        // PING frame
1231        let frame = [
1232            0x00, 0x00, 0x08, // length = 8
1233            0x06, // type = PING
1234            0x00, // flags = 0
1235            0x00, 0x00, 0x00, 0x00, // stream_id = 0
1236            0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // opaque data
1237        ];
1238
1239        let result = parser.parse_stream(&frame, &ctx);
1240        match result {
1241            StreamParseResult::Complete { messages, .. } => {
1242                assert_eq!(messages.len(), 1);
1243                assert_eq!(
1244                    messages[0].fields.get("frame_type"),
1245                    Some(&FieldValue::Str("PING"))
1246                );
1247                assert_eq!(
1248                    messages[0].fields.get("ack"),
1249                    Some(&FieldValue::Bool(false))
1250                );
1251            }
1252            _ => panic!("Expected Complete"),
1253        }
1254    }
1255
1256    #[test]
1257    fn test_parse_window_update_frame() {
1258        let parser = Http2StreamParser::new();
1259        let mut ctx = test_context();
1260        ctx.direction = Direction::ToClient;
1261
1262        // WINDOW_UPDATE frame, increment = 65535
1263        let frame = [
1264            0x00, 0x00, 0x04, // length = 4
1265            0x08, // type = WINDOW_UPDATE
1266            0x00, // flags = 0
1267            0x00, 0x00, 0x00, 0x00, // stream_id = 0
1268            0x00, 0x00, 0xff, 0xff, // increment = 65535
1269        ];
1270
1271        let result = parser.parse_stream(&frame, &ctx);
1272        match result {
1273            StreamParseResult::Complete { messages, .. } => {
1274                assert_eq!(messages.len(), 1);
1275                assert_eq!(
1276                    messages[0].fields.get("window_increment"),
1277                    Some(&FieldValue::UInt32(65535))
1278                );
1279            }
1280            _ => panic!("Expected Complete"),
1281        }
1282    }
1283
1284    #[test]
1285    fn test_parse_goaway_frame() {
1286        let parser = Http2StreamParser::new();
1287        let mut ctx = test_context();
1288        ctx.direction = Direction::ToClient;
1289
1290        // GOAWAY frame, last_stream_id=1, error=NO_ERROR, debug="bye"
1291        let frame = [
1292            0x00, 0x00, 0x0b, // length = 11
1293            0x07, // type = GOAWAY
1294            0x00, // flags = 0
1295            0x00, 0x00, 0x00, 0x00, // stream_id = 0
1296            0x00, 0x00, 0x00, 0x01, // last_stream_id = 1
1297            0x00, 0x00, 0x00, 0x00, // error_code = NO_ERROR
1298            b'b', b'y', b'e', // debug data
1299        ];
1300
1301        let result = parser.parse_stream(&frame, &ctx);
1302        match result {
1303            StreamParseResult::Complete { messages, .. } => {
1304                assert_eq!(messages.len(), 1);
1305                assert_eq!(
1306                    messages[0].fields.get("last_stream_id"),
1307                    Some(&FieldValue::UInt32(1))
1308                );
1309                assert_eq!(
1310                    messages[0].fields.get("error_name"),
1311                    Some(&FieldValue::Str("NO_ERROR"))
1312                );
1313            }
1314            _ => panic!("Expected Complete"),
1315        }
1316    }
1317
1318    #[test]
1319    fn test_parse_data_frame() {
1320        let parser = Http2StreamParser::new();
1321        let mut ctx = test_context();
1322        ctx.direction = Direction::ToClient;
1323
1324        // DATA frame on stream 1, END_STREAM
1325        let mut frame = vec![
1326            0x00, 0x00, 0x05, // length = 5
1327            0x00, // type = DATA
1328            0x01, // flags = END_STREAM
1329            0x00, 0x00, 0x00, 0x01, // stream_id = 1
1330        ];
1331        frame.extend_from_slice(b"hello");
1332
1333        let result = parser.parse_stream(&frame, &ctx);
1334        match result {
1335            StreamParseResult::Complete { messages, .. } => {
1336                assert_eq!(messages.len(), 1);
1337                assert_eq!(
1338                    messages[0].fields.get("data_length"),
1339                    Some(&FieldValue::UInt64(5))
1340                );
1341                assert_eq!(
1342                    messages[0].fields.get("end_stream"),
1343                    Some(&FieldValue::Bool(true))
1344                );
1345            }
1346            _ => panic!("Expected Complete"),
1347        }
1348    }
1349
1350    #[test]
1351    fn test_parse_rst_stream_frame() {
1352        let parser = Http2StreamParser::new();
1353        let mut ctx = test_context();
1354        ctx.direction = Direction::ToClient;
1355
1356        // RST_STREAM frame, stream 1, CANCEL error
1357        let frame = [
1358            0x00, 0x00, 0x04, // length = 4
1359            0x03, // type = RST_STREAM
1360            0x00, // flags = 0
1361            0x00, 0x00, 0x00, 0x01, // stream_id = 1
1362            0x00, 0x00, 0x00, 0x08, // error_code = CANCEL (8)
1363        ];
1364
1365        let result = parser.parse_stream(&frame, &ctx);
1366        match result {
1367            StreamParseResult::Complete { messages, .. } => {
1368                assert_eq!(messages.len(), 1);
1369                assert_eq!(
1370                    messages[0].fields.get("error_code"),
1371                    Some(&FieldValue::UInt32(8))
1372                );
1373                assert_eq!(
1374                    messages[0].fields.get("error_name"),
1375                    Some(&FieldValue::Str("CANCEL"))
1376                );
1377            }
1378            _ => panic!("Expected Complete"),
1379        }
1380    }
1381
1382    #[test]
1383    fn test_incomplete_frame_needs_more() {
1384        let parser = Http2StreamParser::new();
1385        let mut ctx = test_context();
1386        ctx.direction = Direction::ToClient;
1387
1388        // Only partial frame header
1389        let partial = [0x00, 0x00, 0x10, 0x00];
1390        let result = parser.parse_stream(&partial, &ctx);
1391        assert!(matches!(result, StreamParseResult::NeedMore { .. }));
1392    }
1393
1394    #[test]
1395    fn test_multiple_frames() {
1396        let parser = Http2StreamParser::new();
1397        let mut ctx = test_context();
1398        ctx.direction = Direction::ToClient;
1399
1400        // Two frames: SETTINGS ACK + PING
1401        let mut data = vec![
1402            // SETTINGS ACK
1403            0x00, 0x00, 0x00, // length = 0
1404            0x04, // type = SETTINGS
1405            0x01, // flags = ACK
1406            0x00, 0x00, 0x00, 0x00, // stream_id = 0
1407            // PING
1408            0x00, 0x00, 0x08, // length = 8
1409            0x06, // type = PING
1410            0x00, // flags = 0
1411            0x00, 0x00, 0x00, 0x00, // stream_id = 0
1412        ];
1413        data.extend_from_slice(&[0u8; 8]); // PING data
1414
1415        let result = parser.parse_stream(&data, &ctx);
1416        match result {
1417            StreamParseResult::Complete { messages, .. } => {
1418                assert_eq!(messages.len(), 2);
1419                assert_eq!(
1420                    messages[0].fields.get("frame_type"),
1421                    Some(&FieldValue::Str("SETTINGS"))
1422                );
1423                assert_eq!(
1424                    messages[1].fields.get("frame_type"),
1425                    Some(&FieldValue::Str("PING"))
1426                );
1427            }
1428            _ => panic!("Expected Complete"),
1429        }
1430    }
1431}