1use std::collections::HashMap;
7use std::sync::{Arc, Mutex};
8
9use bytes::{Buf, BytesMut};
10use compact_str::CompactString;
11use hpack::Decoder as HpackDecoder;
12
13use crate::protocol::{FieldValue, OwnedFieldValue};
14use crate::schema::{DataKind, FieldDescriptor};
15use crate::stream::{Direction, ParsedMessage, StreamContext, StreamParseResult, StreamParser};
16
17pub const CONNECTION_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum FrameType {
23 Data,
24 Headers,
25 Priority,
26 RstStream,
27 Settings,
28 PushPromise,
29 Ping,
30 GoAway,
31 WindowUpdate,
32 Continuation,
33 Unknown(u8),
34}
35
36impl From<u8> for FrameType {
37 fn from(v: u8) -> Self {
38 match v {
39 0x0 => FrameType::Data,
40 0x1 => FrameType::Headers,
41 0x2 => FrameType::Priority,
42 0x3 => FrameType::RstStream,
43 0x4 => FrameType::Settings,
44 0x5 => FrameType::PushPromise,
45 0x6 => FrameType::Ping,
46 0x7 => FrameType::GoAway,
47 0x8 => FrameType::WindowUpdate,
48 0x9 => FrameType::Continuation,
49 other => FrameType::Unknown(other),
50 }
51 }
52}
53
54impl FrameType {
55 pub fn as_str(&self) -> &'static str {
57 match self {
58 FrameType::Data => "DATA",
59 FrameType::Headers => "HEADERS",
60 FrameType::Priority => "PRIORITY",
61 FrameType::RstStream => "RST_STREAM",
62 FrameType::Settings => "SETTINGS",
63 FrameType::PushPromise => "PUSH_PROMISE",
64 FrameType::Ping => "PING",
65 FrameType::GoAway => "GOAWAY",
66 FrameType::WindowUpdate => "WINDOW_UPDATE",
67 FrameType::Continuation => "CONTINUATION",
68 FrameType::Unknown(_) => "UNKNOWN",
69 }
70 }
71}
72
73pub mod flags {
75 pub const END_STREAM: u8 = 0x1;
76 pub const END_HEADERS: u8 = 0x4;
77 pub const PADDED: u8 = 0x8;
78 pub const PRIORITY: u8 = 0x20;
79 pub const ACK: u8 = 0x1;
80}
81
82pub mod error_codes {
84 pub const NO_ERROR: u32 = 0x0;
85 pub const PROTOCOL_ERROR: u32 = 0x1;
86 pub const INTERNAL_ERROR: u32 = 0x2;
87 pub const FLOW_CONTROL_ERROR: u32 = 0x3;
88 pub const SETTINGS_TIMEOUT: u32 = 0x4;
89 pub const STREAM_CLOSED: u32 = 0x5;
90 pub const FRAME_SIZE_ERROR: u32 = 0x6;
91 pub const REFUSED_STREAM: u32 = 0x7;
92 pub const CANCEL: u32 = 0x8;
93 pub const COMPRESSION_ERROR: u32 = 0x9;
94 pub const CONNECT_ERROR: u32 = 0xa;
95 pub const ENHANCE_YOUR_CALM: u32 = 0xb;
96 pub const INADEQUATE_SECURITY: u32 = 0xc;
97 pub const HTTP_1_1_REQUIRED: u32 = 0xd;
98
99 pub fn name(code: u32) -> &'static str {
100 match code {
101 NO_ERROR => "NO_ERROR",
102 PROTOCOL_ERROR => "PROTOCOL_ERROR",
103 INTERNAL_ERROR => "INTERNAL_ERROR",
104 FLOW_CONTROL_ERROR => "FLOW_CONTROL_ERROR",
105 SETTINGS_TIMEOUT => "SETTINGS_TIMEOUT",
106 STREAM_CLOSED => "STREAM_CLOSED",
107 FRAME_SIZE_ERROR => "FRAME_SIZE_ERROR",
108 REFUSED_STREAM => "REFUSED_STREAM",
109 CANCEL => "CANCEL",
110 COMPRESSION_ERROR => "COMPRESSION_ERROR",
111 CONNECT_ERROR => "CONNECT_ERROR",
112 ENHANCE_YOUR_CALM => "ENHANCE_YOUR_CALM",
113 INADEQUATE_SECURITY => "INADEQUATE_SECURITY",
114 HTTP_1_1_REQUIRED => "HTTP_1_1_REQUIRED",
115 _ => "UNKNOWN",
116 }
117 }
118}
119
120pub mod settings {
122 pub const HEADER_TABLE_SIZE: u16 = 0x1;
123 pub const ENABLE_PUSH: u16 = 0x2;
124 pub const MAX_CONCURRENT_STREAMS: u16 = 0x3;
125 pub const INITIAL_WINDOW_SIZE: u16 = 0x4;
126 pub const MAX_FRAME_SIZE: u16 = 0x5;
127 pub const MAX_HEADER_LIST_SIZE: u16 = 0x6;
128
129 pub fn name(id: u16) -> &'static str {
130 match id {
131 HEADER_TABLE_SIZE => "HEADER_TABLE_SIZE",
132 ENABLE_PUSH => "ENABLE_PUSH",
133 MAX_CONCURRENT_STREAMS => "MAX_CONCURRENT_STREAMS",
134 INITIAL_WINDOW_SIZE => "INITIAL_WINDOW_SIZE",
135 MAX_FRAME_SIZE => "MAX_FRAME_SIZE",
136 MAX_HEADER_LIST_SIZE => "MAX_HEADER_LIST_SIZE",
137 _ => "UNKNOWN",
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct FrameHeader {
145 pub length: u32,
146 pub frame_type: FrameType,
147 pub flags: u8,
148 pub stream_id: u32,
149}
150
151impl FrameHeader {
152 pub const SIZE: usize = 9;
153
154 pub fn parse(data: &[u8]) -> Option<Self> {
155 if data.len() < Self::SIZE {
156 return None;
157 }
158
159 let length = ((data[0] as u32) << 16) | ((data[1] as u32) << 8) | (data[2] as u32);
160 let frame_type = FrameType::from(data[3]);
161 let flags = data[4];
162 let stream_id = ((data[5] as u32 & 0x7F) << 24)
163 | ((data[6] as u32) << 16)
164 | ((data[7] as u32) << 8)
165 | (data[8] as u32);
166
167 Some(FrameHeader {
168 length,
169 frame_type,
170 flags,
171 stream_id,
172 })
173 }
174
175 pub fn is_end_stream(&self) -> bool {
177 self.flags & flags::END_STREAM != 0
178 }
179
180 pub fn is_end_headers(&self) -> bool {
182 self.flags & flags::END_HEADERS != 0
183 }
184
185 pub fn is_padded(&self) -> bool {
187 self.flags & flags::PADDED != 0
188 }
189
190 pub fn is_priority(&self) -> bool {
192 self.flags & flags::PRIORITY != 0
193 }
194
195 pub fn is_ack(&self) -> bool {
197 self.flags & flags::ACK != 0
198 }
199}
200
201#[derive(Debug, Clone)]
203#[allow(dead_code)]
204pub struct PriorityData {
205 pub exclusive: bool,
206 pub stream_dependency: u32,
207 pub weight: u8,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212pub enum StreamState {
213 Idle,
214 Open,
215 #[allow(dead_code)] HalfClosedLocal,
217 HalfClosedRemote,
218 Closed,
219}
220
221impl StreamState {
222 pub fn as_str(&self) -> &'static str {
223 match self {
224 StreamState::Idle => "idle",
225 StreamState::Open => "open",
226 StreamState::HalfClosedLocal => "half-closed (local)",
227 StreamState::HalfClosedRemote => "half-closed (remote)",
228 StreamState::Closed => "closed",
229 }
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct Http2Stream {
236 #[allow(dead_code)] pub stream_id: u32,
238 pub state: StreamState,
239
240 pub method: Option<String>,
242 pub path: Option<String>,
243 pub authority: Option<String>,
244 pub scheme: Option<String>,
245 pub request_headers: Vec<(String, String)>,
246 pub request_body_len: usize,
247
248 pub status: Option<u16>,
250 pub response_headers: Vec<(String, String)>,
251 pub response_body_len: usize,
252
253 pub request_frame: Option<u64>,
255 pub response_frame: Option<u64>,
256}
257
258impl Http2Stream {
259 pub fn new(stream_id: u32) -> Self {
260 Self {
261 stream_id,
262 state: StreamState::Idle,
263 method: None,
264 path: None,
265 authority: None,
266 scheme: None,
267 request_headers: Vec::new(),
268 request_body_len: 0,
269 status: None,
270 response_headers: Vec::new(),
271 response_body_len: 0,
272 request_frame: None,
273 response_frame: None,
274 }
275 }
276
277 #[allow(dead_code)]
279 pub fn get_header(&self, name: &str) -> Option<&str> {
280 for (n, v) in &self.request_headers {
282 if n.eq_ignore_ascii_case(name) {
283 return Some(v);
284 }
285 }
286 for (n, v) in &self.response_headers {
287 if n.eq_ignore_ascii_case(name) {
288 return Some(v);
289 }
290 }
291 None
292 }
293}
294
295struct Http2ConnectionState {
297 client_decoder: HpackDecoder<'static>,
299 server_decoder: HpackDecoder<'static>,
300
301 streams: HashMap<u32, Http2Stream>,
303
304 continuation: Option<(u32, Vec<u8>, Direction)>,
306
307 buffer: BytesMut,
309
310 client_preface_seen: bool,
312
313 server_preface_seen: bool,
315}
316
317impl Http2ConnectionState {
318 fn new() -> Self {
319 Self {
320 client_decoder: HpackDecoder::new(),
321 server_decoder: HpackDecoder::new(),
322 streams: HashMap::new(),
323 continuation: None,
324 buffer: BytesMut::new(),
325 client_preface_seen: false,
326 server_preface_seen: false,
327 }
328 }
329
330 fn get_decoder(&mut self, direction: Direction) -> &mut HpackDecoder<'static> {
331 match direction {
332 Direction::ToServer => &mut self.client_decoder,
333 Direction::ToClient => &mut self.server_decoder,
334 }
335 }
336}
337
338pub struct Http2StreamParser {
340 connections: Arc<Mutex<HashMap<u64, Http2ConnectionState>>>,
342}
343
344impl Http2StreamParser {
345 pub fn new() -> Self {
346 Self {
347 connections: Arc::new(Mutex::new(HashMap::new())),
348 }
349 }
350
351 pub fn remove_connection(&self, connection_id: u64) {
353 let mut connections = self.connections.lock().unwrap();
354 connections.remove(&connection_id);
355 }
356
357 fn parse_frame(
359 state: &mut Http2ConnectionState,
360 header: &FrameHeader,
361 payload: &[u8],
362 direction: Direction,
363 frame_number: u64,
364 ) -> HashMap<&'static str, OwnedFieldValue> {
365 let mut fields = HashMap::new();
366
367 fields.insert("frame_type", FieldValue::Str(header.frame_type.as_str()));
368 fields.insert("stream_id", FieldValue::UInt32(header.stream_id));
369 fields.insert("flags", FieldValue::UInt8(header.flags));
370 fields.insert("length", FieldValue::UInt32(header.length));
371
372 match header.frame_type {
373 FrameType::Data => {
374 Self::parse_data_frame(
375 state,
376 header,
377 payload,
378 direction,
379 frame_number,
380 &mut fields,
381 );
382 }
383 FrameType::Headers => {
384 Self::parse_headers_frame(
385 state,
386 header,
387 payload,
388 direction,
389 frame_number,
390 &mut fields,
391 );
392 }
393 FrameType::Priority => {
394 Self::parse_priority_frame(payload, &mut fields);
395 }
396 FrameType::RstStream => {
397 Self::parse_rst_stream_frame(state, header, payload, &mut fields);
398 }
399 FrameType::Settings => {
400 Self::parse_settings_frame(header, payload, &mut fields);
401 }
402 FrameType::PushPromise => {
403 Self::parse_push_promise_frame(state, header, payload, direction, &mut fields);
404 }
405 FrameType::Ping => {
406 Self::parse_ping_frame(header, payload, &mut fields);
407 }
408 FrameType::GoAway => {
409 Self::parse_goaway_frame(payload, &mut fields);
410 }
411 FrameType::WindowUpdate => {
412 Self::parse_window_update_frame(payload, &mut fields);
413 }
414 FrameType::Continuation => {
415 Self::parse_continuation_frame(state, header, payload, direction, &mut fields);
416 }
417 FrameType::Unknown(t) => {
418 fields.insert("unknown_type", FieldValue::UInt8(t));
419 }
420 }
421
422 if header.stream_id != 0 {
424 if let Some(stream) = state.streams.get(&header.stream_id) {
425 fields.insert("stream_state", FieldValue::Str(stream.state.as_str()));
426 }
427 }
428
429 fields
430 }
431
432 fn parse_data_frame(
433 state: &mut Http2ConnectionState,
434 header: &FrameHeader,
435 payload: &[u8],
436 direction: Direction,
437 _frame_number: u64,
438 fields: &mut HashMap<&'static str, OwnedFieldValue>,
439 ) {
440 let (data, pad_len) = if header.is_padded() && !payload.is_empty() {
441 let pad_len = payload[0] as usize;
442 if pad_len < payload.len() {
443 (&payload[1..payload.len() - pad_len], pad_len)
444 } else {
445 (&payload[1..], 0)
446 }
447 } else {
448 (payload, 0)
449 };
450
451 fields.insert("data_length", FieldValue::UInt64(data.len() as u64));
452 if pad_len > 0 {
453 fields.insert("padding_length", FieldValue::UInt8(pad_len as u8));
454 }
455 fields.insert("end_stream", FieldValue::Bool(header.is_end_stream()));
456
457 let stream = state
459 .streams
460 .entry(header.stream_id)
461 .or_insert_with(|| Http2Stream::new(header.stream_id));
462
463 match direction {
464 Direction::ToServer => {
465 stream.request_body_len += data.len();
466 }
467 Direction::ToClient => {
468 stream.response_body_len += data.len();
469 }
470 }
471
472 if header.is_end_stream() {
473 stream.state = match stream.state {
474 StreamState::Open => StreamState::HalfClosedRemote,
475 StreamState::HalfClosedLocal => StreamState::Closed,
476 _ => StreamState::Closed,
477 };
478 }
479 }
480
481 fn parse_headers_frame(
482 state: &mut Http2ConnectionState,
483 header: &FrameHeader,
484 payload: &[u8],
485 direction: Direction,
486 frame_number: u64,
487 fields: &mut HashMap<&'static str, OwnedFieldValue>,
488 ) {
489 let mut offset = 0;
490 let mut pad_len = 0;
491
492 if header.is_padded() && !payload.is_empty() {
493 pad_len = payload[0] as usize;
494 offset += 1;
495 }
496
497 if header.is_priority() && payload.len() >= offset + 5 {
499 let dep_bytes = &payload[offset..offset + 4];
500 let dep = u32::from_be_bytes([
501 dep_bytes[0] & 0x7F,
502 dep_bytes[1],
503 dep_bytes[2],
504 dep_bytes[3],
505 ]);
506 let exclusive = dep_bytes[0] & 0x80 != 0;
507 let weight = payload[offset + 4];
508 offset += 5;
509
510 fields.insert("priority_exclusive", FieldValue::Bool(exclusive));
511 fields.insert("priority_dependency", FieldValue::UInt32(dep));
512 fields.insert("priority_weight", FieldValue::UInt8(weight));
513 }
514
515 let header_block_end = payload.len().saturating_sub(pad_len);
516 let header_block = &payload[offset.min(header_block_end)..header_block_end];
517
518 fields.insert("end_stream", FieldValue::Bool(header.is_end_stream()));
519 fields.insert("end_headers", FieldValue::Bool(header.is_end_headers()));
520
521 let decoded_headers = if header.is_end_headers() {
523 let decoder = state.get_decoder(direction);
524 decoder.decode(header_block).ok()
525 } else {
526 state.continuation = Some((header.stream_id, header_block.to_vec(), direction));
528 None
529 };
530
531 let stream = state
533 .streams
534 .entry(header.stream_id)
535 .or_insert_with(|| Http2Stream::new(header.stream_id));
536
537 if stream.state == StreamState::Idle {
538 stream.state = StreamState::Open;
539 stream.request_frame = Some(frame_number);
540 }
541
542 if let Some(headers) = decoded_headers {
544 Self::process_headers(stream, &headers, direction, frame_number, fields);
545 }
546
547 if header.is_end_stream() {
548 stream.state = match stream.state {
549 StreamState::Open => StreamState::HalfClosedRemote,
550 StreamState::HalfClosedLocal => StreamState::Closed,
551 _ => StreamState::Closed,
552 };
553 }
554 }
555
556 fn process_headers(
557 stream: &mut Http2Stream,
558 headers: &[(Vec<u8>, Vec<u8>)],
559 direction: Direction,
560 frame_number: u64,
561 fields: &mut HashMap<&'static str, OwnedFieldValue>,
562 ) {
563 let mut header_strs = Vec::new();
564
565 for (name, value) in headers {
566 let name_str = String::from_utf8_lossy(name).to_string();
567 let value_str = String::from_utf8_lossy(value).to_string();
568
569 match name_str.as_str() {
571 ":method" => {
572 stream.method = Some(value_str.clone());
573 fields.insert(
574 "method",
575 FieldValue::OwnedString(CompactString::new(&value_str)),
576 );
577 }
578 ":path" => {
579 stream.path = Some(value_str.clone());
580 fields.insert(
581 "path",
582 FieldValue::OwnedString(CompactString::new(&value_str)),
583 );
584 }
585 ":authority" => {
586 stream.authority = Some(value_str.clone());
587 fields.insert(
588 "authority",
589 FieldValue::OwnedString(CompactString::new(&value_str)),
590 );
591 }
592 ":scheme" => {
593 stream.scheme = Some(value_str.clone());
594 fields.insert(
595 "scheme",
596 FieldValue::OwnedString(CompactString::new(&value_str)),
597 );
598 }
599 ":status" => {
600 if let Ok(status) = value_str.parse::<u16>() {
601 stream.status = Some(status);
602 stream.response_frame = Some(frame_number);
603 fields.insert("status", FieldValue::UInt16(status));
604 }
605 }
606 "content-type" => {
607 fields.insert(
608 "content_type",
609 FieldValue::OwnedString(CompactString::new(&value_str)),
610 );
611 }
612 "content-length" => {
613 if let Ok(len) = value_str.parse::<u64>() {
614 fields.insert("content_length", FieldValue::UInt64(len));
615 }
616 }
617 "user-agent" => {
618 fields.insert(
619 "user_agent",
620 FieldValue::OwnedString(CompactString::new(&value_str)),
621 );
622 }
623 _ => {}
624 }
625
626 if direction == Direction::ToServer || stream.status.is_none() {
628 stream
629 .request_headers
630 .push((name_str.clone(), value_str.clone()));
631 } else {
632 stream
633 .response_headers
634 .push((name_str.clone(), value_str.clone()));
635 }
636
637 header_strs.push(format!("{name_str}: {value_str}"));
638 }
639
640 if !header_strs.is_empty() {
642 let headers_str = header_strs.join("; ");
643 if direction == Direction::ToServer || stream.status.is_none() {
644 fields.insert(
645 "request_headers",
646 FieldValue::OwnedString(CompactString::new(&headers_str)),
647 );
648 } else {
649 fields.insert(
650 "response_headers",
651 FieldValue::OwnedString(CompactString::new(&headers_str)),
652 );
653 }
654 }
655 }
656
657 fn parse_priority_frame(payload: &[u8], fields: &mut HashMap<&'static str, OwnedFieldValue>) {
658 if payload.len() >= 5 {
659 let dep = u32::from_be_bytes([payload[0] & 0x7F, payload[1], payload[2], payload[3]]);
660 let exclusive = payload[0] & 0x80 != 0;
661 let weight = payload[4];
662
663 fields.insert("priority_exclusive", FieldValue::Bool(exclusive));
664 fields.insert("priority_dependency", FieldValue::UInt32(dep));
665 fields.insert("priority_weight", FieldValue::UInt8(weight));
666 }
667 }
668
669 fn parse_rst_stream_frame(
670 state: &mut Http2ConnectionState,
671 header: &FrameHeader,
672 payload: &[u8],
673 fields: &mut HashMap<&'static str, OwnedFieldValue>,
674 ) {
675 if payload.len() >= 4 {
676 let error_code = u32::from_be_bytes([payload[0], payload[1], payload[2], payload[3]]);
677 fields.insert("error_code", FieldValue::UInt32(error_code));
678 fields.insert("error_name", FieldValue::Str(error_codes::name(error_code)));
679 }
680
681 if let Some(stream) = state.streams.get_mut(&header.stream_id) {
683 stream.state = StreamState::Closed;
684 }
685 }
686
687 fn parse_settings_frame(
688 header: &FrameHeader,
689 payload: &[u8],
690 fields: &mut HashMap<&'static str, OwnedFieldValue>,
691 ) {
692 fields.insert("ack", FieldValue::Bool(header.is_ack()));
693
694 if !header.is_ack() {
695 let mut settings_strs = Vec::new();
696 let mut pos = 0;
697 while pos + 6 <= payload.len() {
698 let id = u16::from_be_bytes([payload[pos], payload[pos + 1]]);
699 let value = u32::from_be_bytes([
700 payload[pos + 2],
701 payload[pos + 3],
702 payload[pos + 4],
703 payload[pos + 5],
704 ]);
705 pos += 6;
706
707 settings_strs.push(format!("{}={}", settings::name(id), value));
708
709 match id {
711 settings::HEADER_TABLE_SIZE => {
712 fields.insert("header_table_size", FieldValue::UInt32(value));
713 }
714 settings::MAX_CONCURRENT_STREAMS => {
715 fields.insert("max_concurrent_streams", FieldValue::UInt32(value));
716 }
717 settings::INITIAL_WINDOW_SIZE => {
718 fields.insert("initial_window_size", FieldValue::UInt32(value));
719 }
720 settings::MAX_FRAME_SIZE => {
721 fields.insert("max_frame_size", FieldValue::UInt32(value));
722 }
723 _ => {}
724 }
725 }
726
727 if !settings_strs.is_empty() {
728 fields.insert(
729 "settings",
730 FieldValue::OwnedString(CompactString::new(settings_strs.join(", "))),
731 );
732 }
733 }
734 }
735
736 fn parse_push_promise_frame(
737 state: &mut Http2ConnectionState,
738 header: &FrameHeader,
739 payload: &[u8],
740 direction: Direction,
741 fields: &mut HashMap<&'static str, OwnedFieldValue>,
742 ) {
743 let mut offset = 0;
744 let mut pad_len = 0;
745
746 if header.is_padded() && !payload.is_empty() {
747 pad_len = payload[0] as usize;
748 offset += 1;
749 }
750
751 if payload.len() >= offset + 4 {
752 let promised_stream_id = u32::from_be_bytes([
753 payload[offset] & 0x7F,
754 payload[offset + 1],
755 payload[offset + 2],
756 payload[offset + 3],
757 ]);
758 offset += 4;
759
760 fields.insert("promised_stream_id", FieldValue::UInt32(promised_stream_id));
761
762 let header_block_end = payload.len().saturating_sub(pad_len);
763 let header_block = &payload[offset.min(header_block_end)..header_block_end];
764
765 if header.is_end_headers() {
766 let decoder = state.get_decoder(direction);
767 if let Ok(headers) = decoder.decode(header_block) {
768 let stream = state
769 .streams
770 .entry(promised_stream_id)
771 .or_insert_with(|| Http2Stream::new(promised_stream_id));
772 Self::process_headers(stream, &headers, direction, 0, fields);
773 }
774 }
775 }
776 }
777
778 fn parse_ping_frame(
779 header: &FrameHeader,
780 payload: &[u8],
781 fields: &mut HashMap<&'static str, OwnedFieldValue>,
782 ) {
783 fields.insert("ack", FieldValue::Bool(header.is_ack()));
784
785 if payload.len() >= 8 {
786 let mut data = [0u8; 8];
787 data.copy_from_slice(&payload[..8]);
788 fields.insert("ping_data", FieldValue::OwnedBytes(data.to_vec()));
789 }
790 }
791
792 fn parse_goaway_frame(payload: &[u8], fields: &mut HashMap<&'static str, OwnedFieldValue>) {
793 if payload.len() >= 8 {
794 let last_stream_id =
795 u32::from_be_bytes([payload[0] & 0x7F, payload[1], payload[2], payload[3]]);
796 let error_code = u32::from_be_bytes([payload[4], payload[5], payload[6], payload[7]]);
797
798 fields.insert("last_stream_id", FieldValue::UInt32(last_stream_id));
799 fields.insert("error_code", FieldValue::UInt32(error_code));
800 fields.insert("error_name", FieldValue::Str(error_codes::name(error_code)));
801
802 if payload.len() > 8 {
803 let debug_data = String::from_utf8_lossy(&payload[8..]).to_string();
804 fields.insert(
805 "debug_data",
806 FieldValue::OwnedString(CompactString::new(&debug_data)),
807 );
808 }
809 }
810 }
811
812 fn parse_window_update_frame(
813 payload: &[u8],
814 fields: &mut HashMap<&'static str, OwnedFieldValue>,
815 ) {
816 if payload.len() >= 4 {
817 let increment =
818 u32::from_be_bytes([payload[0] & 0x7F, payload[1], payload[2], payload[3]]);
819 fields.insert("window_increment", FieldValue::UInt32(increment));
820 }
821 }
822
823 fn parse_continuation_frame(
824 state: &mut Http2ConnectionState,
825 header: &FrameHeader,
826 payload: &[u8],
827 direction: Direction,
828 fields: &mut HashMap<&'static str, OwnedFieldValue>,
829 ) {
830 fields.insert("end_headers", FieldValue::Bool(header.is_end_headers()));
831
832 if let Some((stream_id, ref mut block, saved_dir)) = state.continuation.take() {
834 if stream_id == header.stream_id && saved_dir == direction {
835 block.extend_from_slice(payload);
836
837 if header.is_end_headers() {
838 let decoder = state.get_decoder(direction);
840 if let Ok(headers) = decoder.decode(block) {
841 let stream = state
842 .streams
843 .entry(header.stream_id)
844 .or_insert_with(|| Http2Stream::new(header.stream_id));
845 Self::process_headers(stream, &headers, direction, 0, fields);
846 }
847 } else {
848 state.continuation = Some((stream_id, block.clone(), saved_dir));
850 }
851 }
852 }
853 }
854}
855
856impl Default for Http2StreamParser {
857 fn default() -> Self {
858 Self::new()
859 }
860}
861
862impl StreamParser for Http2StreamParser {
863 fn name(&self) -> &'static str {
864 "http2"
865 }
866
867 fn display_name(&self) -> &'static str {
868 "HTTP/2"
869 }
870
871 fn can_parse_stream(&self, context: &StreamContext) -> bool {
872 if let Some(ref alpn) = context.alpn {
874 return alpn == "h2" || alpn == "h2c";
875 }
876 false
877 }
878
879 fn parse_stream(&self, data: &[u8], context: &StreamContext) -> StreamParseResult {
880 let mut connections = self.connections.lock().unwrap();
881 let state = connections
882 .entry(context.connection_id)
883 .or_insert_with(Http2ConnectionState::new);
884
885 let buffer_len_before = state.buffer.len();
887
888 state.buffer.extend_from_slice(data);
890
891 if context.direction == Direction::ToServer && !state.client_preface_seen {
893 if state.buffer.len() >= CONNECTION_PREFACE.len() {
894 if &state.buffer[..CONNECTION_PREFACE.len()] == CONNECTION_PREFACE {
895 state.buffer.advance(CONNECTION_PREFACE.len());
896 state.client_preface_seen = true;
897 } else {
898 return StreamParseResult::NotThisProtocol;
900 }
901 } else {
902 return StreamParseResult::NeedMore {
904 minimum_bytes: Some(CONNECTION_PREFACE.len()),
905 };
906 }
907 }
908
909 let mut messages = Vec::new();
910 let total_input = buffer_len_before + data.len();
913
914 loop {
916 if state.buffer.len() < FrameHeader::SIZE {
917 break; }
919
920 let header = FrameHeader::parse(&state.buffer).unwrap();
921 let total_frame_len = FrameHeader::SIZE + header.length as usize;
922
923 if state.buffer.len() < total_frame_len {
924 break; }
926
927 let payload = state.buffer[FrameHeader::SIZE..total_frame_len].to_vec();
928 state.buffer.advance(total_frame_len);
929
930 if context.direction == Direction::ToClient
932 && !state.server_preface_seen
933 && header.frame_type == FrameType::Settings
934 {
935 state.server_preface_seen = true;
936 }
937
938 let fields = Self::parse_frame(state, &header, &payload, context.direction, 0);
940
941 let message = ParsedMessage {
942 protocol: "http2",
943 connection_id: context.connection_id,
944 message_id: context.messages_parsed as u32 + messages.len() as u32,
945 direction: context.direction,
946 frame_number: 0, fields,
948 };
949 messages.push(message);
950 }
951
952 let total_consumed = total_input - state.buffer.len();
954
955 if !messages.is_empty() {
956 StreamParseResult::Complete {
957 messages,
958 bytes_consumed: total_consumed,
959 }
960 } else if total_consumed == 0 {
961 StreamParseResult::NeedMore {
962 minimum_bytes: Some(FrameHeader::SIZE),
963 }
964 } else {
965 StreamParseResult::Complete {
966 messages: vec![],
967 bytes_consumed: total_consumed,
968 }
969 }
970 }
971
972 fn message_schema(&self) -> Vec<FieldDescriptor> {
973 vec![
974 FieldDescriptor::new("connection_id", DataKind::UInt64),
976 FieldDescriptor::new("frame_type", DataKind::String),
977 FieldDescriptor::new("stream_id", DataKind::UInt32),
978 FieldDescriptor::new("flags", DataKind::UInt8),
979 FieldDescriptor::new("length", DataKind::UInt32),
980 FieldDescriptor::new("method", DataKind::String).set_nullable(true),
982 FieldDescriptor::new("path", DataKind::String).set_nullable(true),
983 FieldDescriptor::new("authority", DataKind::String).set_nullable(true),
984 FieldDescriptor::new("scheme", DataKind::String).set_nullable(true),
985 FieldDescriptor::new("status", DataKind::UInt16).set_nullable(true),
987 FieldDescriptor::new("request_headers", DataKind::String).set_nullable(true),
989 FieldDescriptor::new("response_headers", DataKind::String).set_nullable(true),
990 FieldDescriptor::new("content_type", DataKind::String).set_nullable(true),
992 FieldDescriptor::new("content_length", DataKind::UInt64).set_nullable(true),
993 FieldDescriptor::new("user_agent", DataKind::String).set_nullable(true),
994 FieldDescriptor::new("data_length", DataKind::UInt64).set_nullable(true),
996 FieldDescriptor::new("end_stream", DataKind::Bool).set_nullable(true),
997 FieldDescriptor::new("end_headers", DataKind::Bool).set_nullable(true),
998 FieldDescriptor::new("settings", DataKind::String).set_nullable(true),
1000 FieldDescriptor::new("ack", DataKind::Bool).set_nullable(true),
1001 FieldDescriptor::new("error_code", DataKind::UInt32).set_nullable(true),
1003 FieldDescriptor::new("error_name", DataKind::String).set_nullable(true),
1004 FieldDescriptor::new("last_stream_id", DataKind::UInt32).set_nullable(true),
1006 FieldDescriptor::new("debug_data", DataKind::String).set_nullable(true),
1007 FieldDescriptor::new("window_increment", DataKind::UInt32).set_nullable(true),
1009 FieldDescriptor::new("priority_exclusive", DataKind::Bool).set_nullable(true),
1011 FieldDescriptor::new("priority_dependency", DataKind::UInt32).set_nullable(true),
1012 FieldDescriptor::new("priority_weight", DataKind::UInt8).set_nullable(true),
1013 FieldDescriptor::new("promised_stream_id", DataKind::UInt32).set_nullable(true),
1015 FieldDescriptor::new("stream_state", DataKind::String).set_nullable(true),
1017 ]
1018 }
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use super::*;
1024 use std::net::Ipv4Addr;
1025
1026 fn test_context() -> StreamContext {
1027 StreamContext {
1028 connection_id: 1,
1029 direction: Direction::ToServer,
1030 src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
1031 dst_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
1032 src_port: 54321,
1033 dst_port: 443,
1034 bytes_parsed: 0,
1035 messages_parsed: 0,
1036 alpn: Some("h2".to_string()),
1037 }
1038 }
1039
1040 #[test]
1041 fn test_frame_type_from_u8() {
1042 assert_eq!(FrameType::from(0x0), FrameType::Data);
1043 assert_eq!(FrameType::from(0x1), FrameType::Headers);
1044 assert_eq!(FrameType::from(0x4), FrameType::Settings);
1045 assert_eq!(FrameType::from(0x7), FrameType::GoAway);
1046 assert!(matches!(FrameType::from(0xFF), FrameType::Unknown(0xFF)));
1047 }
1048
1049 #[test]
1050 fn test_frame_type_as_str() {
1051 assert_eq!(FrameType::Data.as_str(), "DATA");
1052 assert_eq!(FrameType::Headers.as_str(), "HEADERS");
1053 assert_eq!(FrameType::Settings.as_str(), "SETTINGS");
1054 assert_eq!(FrameType::Unknown(99).as_str(), "UNKNOWN");
1055 }
1056
1057 #[test]
1058 fn test_frame_header_parse() {
1059 let data = [0x00, 0x00, 0x06, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00];
1061 let header = FrameHeader::parse(&data).unwrap();
1062 assert_eq!(header.length, 6);
1063 assert_eq!(header.frame_type, FrameType::Settings);
1064 assert_eq!(header.flags, 0);
1065 assert_eq!(header.stream_id, 0);
1066 }
1067
1068 #[test]
1069 fn test_frame_header_parse_with_stream_id() {
1070 let data = [0x00, 0x00, 0x10, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01];
1072 let header = FrameHeader::parse(&data).unwrap();
1073 assert_eq!(header.length, 16);
1074 assert_eq!(header.frame_type, FrameType::Data);
1075 assert!(header.is_end_stream());
1076 assert_eq!(header.stream_id, 1);
1077 }
1078
1079 #[test]
1080 fn test_frame_header_flags() {
1081 let mut data = [0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01];
1082
1083 data[4] = flags::END_STREAM;
1085 let header = FrameHeader::parse(&data).unwrap();
1086 assert!(header.is_end_stream());
1087 assert!(!header.is_end_headers());
1088
1089 data[4] = flags::END_HEADERS;
1091 let header = FrameHeader::parse(&data).unwrap();
1092 assert!(header.is_end_headers());
1093 assert!(!header.is_end_stream());
1094
1095 data[4] = flags::PADDED;
1097 let header = FrameHeader::parse(&data).unwrap();
1098 assert!(header.is_padded());
1099
1100 data[4] = flags::PRIORITY;
1102 let header = FrameHeader::parse(&data).unwrap();
1103 assert!(header.is_priority());
1104 }
1105
1106 #[test]
1107 fn test_error_code_names() {
1108 assert_eq!(error_codes::name(error_codes::NO_ERROR), "NO_ERROR");
1109 assert_eq!(
1110 error_codes::name(error_codes::PROTOCOL_ERROR),
1111 "PROTOCOL_ERROR"
1112 );
1113 assert_eq!(error_codes::name(error_codes::CANCEL), "CANCEL");
1114 assert_eq!(error_codes::name(0xFFFF), "UNKNOWN");
1115 }
1116
1117 #[test]
1118 fn test_settings_names() {
1119 assert_eq!(
1120 settings::name(settings::HEADER_TABLE_SIZE),
1121 "HEADER_TABLE_SIZE"
1122 );
1123 assert_eq!(settings::name(settings::MAX_FRAME_SIZE), "MAX_FRAME_SIZE");
1124 assert_eq!(settings::name(0xFFFF), "UNKNOWN");
1125 }
1126
1127 #[test]
1128 fn test_http2_stream_state() {
1129 assert_eq!(StreamState::Idle.as_str(), "idle");
1130 assert_eq!(StreamState::Open.as_str(), "open");
1131 assert_eq!(StreamState::Closed.as_str(), "closed");
1132 }
1133
1134 #[test]
1135 fn test_http2_stream_new() {
1136 let stream = Http2Stream::new(1);
1137 assert_eq!(stream.stream_id, 1);
1138 assert_eq!(stream.state, StreamState::Idle);
1139 assert!(stream.method.is_none());
1140 assert!(stream.status.is_none());
1141 }
1142
1143 #[test]
1144 fn test_parser_can_parse_stream() {
1145 let parser = Http2StreamParser::new();
1146
1147 let mut ctx = test_context();
1149 ctx.alpn = Some("h2".to_string());
1150 assert!(parser.can_parse_stream(&ctx));
1151
1152 ctx.alpn = None;
1154 assert!(!parser.can_parse_stream(&ctx));
1155
1156 ctx.alpn = Some("http/1.1".to_string());
1158 assert!(!parser.can_parse_stream(&ctx));
1159 }
1160
1161 #[test]
1162 fn test_parse_connection_preface() {
1163 let ctx = test_context();
1164
1165 let parser1 = Http2StreamParser::new();
1167 let partial = &CONNECTION_PREFACE[..10];
1168 let result = parser1.parse_stream(partial, &ctx);
1169 assert!(matches!(result, StreamParseResult::NeedMore { .. }));
1170
1171 let parser2 = Http2StreamParser::new();
1173 let mut data = CONNECTION_PREFACE.to_vec();
1174 data.extend_from_slice(&[0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00]);
1176
1177 let result = parser2.parse_stream(&data, &ctx);
1178 match result {
1179 StreamParseResult::Complete { messages, .. } => {
1180 assert_eq!(messages.len(), 1);
1181 assert_eq!(
1182 messages[0].fields.get("frame_type"),
1183 Some(&FieldValue::Str("SETTINGS"))
1184 );
1185 }
1186 _ => panic!("Expected Complete, got {:?}", result),
1187 }
1188 }
1189
1190 #[test]
1191 fn test_parse_settings_frame() {
1192 let parser = Http2StreamParser::new();
1193 let mut ctx = test_context();
1194 ctx.direction = Direction::ToClient; let frame = [
1198 0x00, 0x00, 0x0c, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x64,
1205 ];
1206
1207 let result = parser.parse_stream(&frame, &ctx);
1208 match result {
1209 StreamParseResult::Complete { messages, .. } => {
1210 assert_eq!(messages.len(), 1);
1211 assert_eq!(
1212 messages[0].fields.get("header_table_size"),
1213 Some(&FieldValue::UInt32(4096))
1214 );
1215 assert_eq!(
1216 messages[0].fields.get("max_concurrent_streams"),
1217 Some(&FieldValue::UInt32(100))
1218 );
1219 }
1220 _ => panic!("Expected Complete"),
1221 }
1222 }
1223
1224 #[test]
1225 fn test_parse_ping_frame() {
1226 let parser = Http2StreamParser::new();
1227 let mut ctx = test_context();
1228 ctx.direction = Direction::ToClient;
1229
1230 let frame = [
1232 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, ];
1238
1239 let result = parser.parse_stream(&frame, &ctx);
1240 match result {
1241 StreamParseResult::Complete { messages, .. } => {
1242 assert_eq!(messages.len(), 1);
1243 assert_eq!(
1244 messages[0].fields.get("frame_type"),
1245 Some(&FieldValue::Str("PING"))
1246 );
1247 assert_eq!(
1248 messages[0].fields.get("ack"),
1249 Some(&FieldValue::Bool(false))
1250 );
1251 }
1252 _ => panic!("Expected Complete"),
1253 }
1254 }
1255
1256 #[test]
1257 fn test_parse_window_update_frame() {
1258 let parser = Http2StreamParser::new();
1259 let mut ctx = test_context();
1260 ctx.direction = Direction::ToClient;
1261
1262 let frame = [
1264 0x00, 0x00, 0x04, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, ];
1270
1271 let result = parser.parse_stream(&frame, &ctx);
1272 match result {
1273 StreamParseResult::Complete { messages, .. } => {
1274 assert_eq!(messages.len(), 1);
1275 assert_eq!(
1276 messages[0].fields.get("window_increment"),
1277 Some(&FieldValue::UInt32(65535))
1278 );
1279 }
1280 _ => panic!("Expected Complete"),
1281 }
1282 }
1283
1284 #[test]
1285 fn test_parse_goaway_frame() {
1286 let parser = Http2StreamParser::new();
1287 let mut ctx = test_context();
1288 ctx.direction = Direction::ToClient;
1289
1290 let frame = [
1292 0x00, 0x00, 0x0b, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, b'b', b'y', b'e', ];
1300
1301 let result = parser.parse_stream(&frame, &ctx);
1302 match result {
1303 StreamParseResult::Complete { messages, .. } => {
1304 assert_eq!(messages.len(), 1);
1305 assert_eq!(
1306 messages[0].fields.get("last_stream_id"),
1307 Some(&FieldValue::UInt32(1))
1308 );
1309 assert_eq!(
1310 messages[0].fields.get("error_name"),
1311 Some(&FieldValue::Str("NO_ERROR"))
1312 );
1313 }
1314 _ => panic!("Expected Complete"),
1315 }
1316 }
1317
1318 #[test]
1319 fn test_parse_data_frame() {
1320 let parser = Http2StreamParser::new();
1321 let mut ctx = test_context();
1322 ctx.direction = Direction::ToClient;
1323
1324 let mut frame = vec![
1326 0x00, 0x00, 0x05, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, ];
1331 frame.extend_from_slice(b"hello");
1332
1333 let result = parser.parse_stream(&frame, &ctx);
1334 match result {
1335 StreamParseResult::Complete { messages, .. } => {
1336 assert_eq!(messages.len(), 1);
1337 assert_eq!(
1338 messages[0].fields.get("data_length"),
1339 Some(&FieldValue::UInt64(5))
1340 );
1341 assert_eq!(
1342 messages[0].fields.get("end_stream"),
1343 Some(&FieldValue::Bool(true))
1344 );
1345 }
1346 _ => panic!("Expected Complete"),
1347 }
1348 }
1349
1350 #[test]
1351 fn test_parse_rst_stream_frame() {
1352 let parser = Http2StreamParser::new();
1353 let mut ctx = test_context();
1354 ctx.direction = Direction::ToClient;
1355
1356 let frame = [
1358 0x00, 0x00, 0x04, 0x03, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, ];
1364
1365 let result = parser.parse_stream(&frame, &ctx);
1366 match result {
1367 StreamParseResult::Complete { messages, .. } => {
1368 assert_eq!(messages.len(), 1);
1369 assert_eq!(
1370 messages[0].fields.get("error_code"),
1371 Some(&FieldValue::UInt32(8))
1372 );
1373 assert_eq!(
1374 messages[0].fields.get("error_name"),
1375 Some(&FieldValue::Str("CANCEL"))
1376 );
1377 }
1378 _ => panic!("Expected Complete"),
1379 }
1380 }
1381
1382 #[test]
1383 fn test_incomplete_frame_needs_more() {
1384 let parser = Http2StreamParser::new();
1385 let mut ctx = test_context();
1386 ctx.direction = Direction::ToClient;
1387
1388 let partial = [0x00, 0x00, 0x10, 0x00];
1390 let result = parser.parse_stream(&partial, &ctx);
1391 assert!(matches!(result, StreamParseResult::NeedMore { .. }));
1392 }
1393
1394 #[test]
1395 fn test_multiple_frames() {
1396 let parser = Http2StreamParser::new();
1397 let mut ctx = test_context();
1398 ctx.direction = Direction::ToClient;
1399
1400 let mut data = vec![
1402 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, ];
1413 data.extend_from_slice(&[0u8; 8]); let result = parser.parse_stream(&data, &ctx);
1416 match result {
1417 StreamParseResult::Complete { messages, .. } => {
1418 assert_eq!(messages.len(), 2);
1419 assert_eq!(
1420 messages[0].fields.get("frame_type"),
1421 Some(&FieldValue::Str("SETTINGS"))
1422 );
1423 assert_eq!(
1424 messages[1].fields.get("frame_type"),
1425 Some(&FieldValue::Str("PING"))
1426 );
1427 }
1428 _ => panic!("Expected Complete"),
1429 }
1430 }
1431}