1use std::collections::HashMap;
11
12use compact_str::CompactString;
13use httparse::{Request, Response, Status, EMPTY_HEADER};
14
15use crate::protocol::FieldValue;
16use crate::schema::{DataKind, FieldDescriptor};
17use crate::stream::{ParsedMessage, StreamContext, StreamParseResult, StreamParser};
18
19const MAX_HEADERS: usize = 100;
21
22#[derive(Debug, Clone, Copy, Default)]
24pub struct HttpStreamParser;
25
26impl HttpStreamParser {
27 pub fn new() -> Self {
28 Self
29 }
30
31 fn extract_headers(
34 headers: &[httparse::Header],
35 fields: &mut HashMap<&'static str, FieldValue>,
36 ) {
37 for header in headers.iter().filter(|h| !h.name.is_empty()) {
38 let name = header.name;
39 let value = String::from_utf8_lossy(header.value).to_string();
40
41 if name.eq_ignore_ascii_case("host") {
42 fields.insert("host", FieldValue::OwnedString(CompactString::new(value)));
43 } else if name.eq_ignore_ascii_case("content-type") {
44 fields.insert(
45 "content_type",
46 FieldValue::OwnedString(CompactString::new(value)),
47 );
48 } else if name.eq_ignore_ascii_case("content-length") {
49 if let Ok(len) = value.parse::<u64>() {
50 fields.insert("content_length", FieldValue::UInt64(len));
51 }
52 } else if name.eq_ignore_ascii_case("user-agent") {
53 fields.insert(
54 "user_agent",
55 FieldValue::OwnedString(CompactString::new(value)),
56 );
57 } else if name.eq_ignore_ascii_case("server") {
58 fields.insert("server", FieldValue::OwnedString(CompactString::new(value)));
59 } else if name.eq_ignore_ascii_case("transfer-encoding") {
60 fields.insert(
61 "transfer_encoding",
62 FieldValue::OwnedString(CompactString::new(value)),
63 );
64 } else if name.eq_ignore_ascii_case("connection") {
65 fields.insert(
66 "connection",
67 FieldValue::OwnedString(CompactString::new(value)),
68 );
69 } else if name.eq_ignore_ascii_case("cookie") {
70 fields.insert("cookie", FieldValue::OwnedString(CompactString::new(value)));
71 } else if name.eq_ignore_ascii_case("set-cookie") {
72 fields
73 .entry("set_cookie")
74 .or_insert(FieldValue::OwnedString(CompactString::new(value)));
75 } else if name.eq_ignore_ascii_case("referer") || name.eq_ignore_ascii_case("referrer")
76 {
77 fields.insert(
78 "referer",
79 FieldValue::OwnedString(CompactString::new(value)),
80 );
81 } else if name.eq_ignore_ascii_case("accept") {
82 fields.insert("accept", FieldValue::OwnedString(CompactString::new(value)));
83 } else if name.eq_ignore_ascii_case("accept-encoding") {
84 fields.insert(
85 "accept_encoding",
86 FieldValue::OwnedString(CompactString::new(value)),
87 );
88 } else if name.eq_ignore_ascii_case("accept-language") {
89 fields.insert(
90 "accept_language",
91 FieldValue::OwnedString(CompactString::new(value)),
92 );
93 } else if name.eq_ignore_ascii_case("cache-control") {
94 fields.insert(
95 "cache_control",
96 FieldValue::OwnedString(CompactString::new(value)),
97 );
98 } else if name.eq_ignore_ascii_case("authorization") {
99 let auth_type = value.split_whitespace().next().unwrap_or(&value);
101 fields.insert(
102 "authorization",
103 FieldValue::OwnedString(CompactString::new(auth_type)),
104 );
105 } else if name.eq_ignore_ascii_case("location") {
106 fields.insert(
107 "location",
108 FieldValue::OwnedString(CompactString::new(value)),
109 );
110 } else if name.eq_ignore_ascii_case("x-forwarded-for") {
111 fields.insert(
112 "x_forwarded_for",
113 FieldValue::OwnedString(CompactString::new(value)),
114 );
115 } else if name.eq_ignore_ascii_case("x-real-ip") {
116 fields.insert(
117 "x_real_ip",
118 FieldValue::OwnedString(CompactString::new(value)),
119 );
120 }
121 }
123 }
124
125 fn is_chunked(fields: &HashMap<&'static str, FieldValue>) -> bool {
127 fields
128 .get("transfer_encoding")
129 .and_then(|v| v.as_str())
130 .map(|s| s.to_lowercase().contains("chunked"))
131 .unwrap_or(false)
132 }
133
134 fn get_content_length(fields: &HashMap<&'static str, FieldValue>) -> Option<usize> {
136 fields.get("content_length").and_then(|v| {
137 if let FieldValue::UInt64(len) = v {
138 Some(*len as usize)
139 } else {
140 None
141 }
142 })
143 }
144
145 fn parse_chunked_body(data: &[u8]) -> Option<usize> {
147 let mut pos = 0;
148
149 loop {
150 let line_end = data[pos..]
152 .windows(2)
153 .position(|w| w == b"\r\n")
154 .map(|p| pos + p)?;
155
156 let size_str = std::str::from_utf8(&data[pos..line_end]).ok()?;
157 let size_part = size_str.split(';').next().unwrap_or(size_str);
159 let chunk_size = usize::from_str_radix(size_part.trim(), 16).ok()?;
160
161 pos = line_end + 2; if chunk_size == 0 {
164 if data.len() >= pos + 2 && &data[pos..pos + 2] == b"\r\n" {
166 return Some(pos + 2);
167 }
168 if let Some(end) = data[pos..].windows(4).position(|w| w == b"\r\n\r\n") {
170 return Some(pos + end + 4);
171 }
172 return None; }
174
175 if data.len() < pos + chunk_size + 2 {
177 return None;
178 }
179
180 if &data[pos + chunk_size..pos + chunk_size + 2] != b"\r\n" {
182 return None; }
184
185 pos += chunk_size + 2;
186 }
187 }
188}
189
190impl StreamParser for HttpStreamParser {
191 fn name(&self) -> &'static str {
192 "http"
193 }
194
195 fn display_name(&self) -> &'static str {
196 "HTTP"
197 }
198
199 fn can_parse_stream(&self, context: &StreamContext) -> bool {
200 const HTTP_PORTS: [u16; 6] = [80, 8080, 8000, 8888, 3000, 5000];
202 HTTP_PORTS.contains(&context.dst_port) || HTTP_PORTS.contains(&context.src_port)
203 }
204
205 fn parse_stream(&self, data: &[u8], context: &StreamContext) -> StreamParseResult {
206 if data.len() < 16 {
208 return StreamParseResult::NeedMore {
209 minimum_bytes: Some(16),
210 };
211 }
212
213 let mut fields = HashMap::new();
214
215 let mut headers = [EMPTY_HEADER; MAX_HEADERS];
217 let mut req = Request::new(&mut headers);
218
219 match req.parse(data) {
220 Ok(Status::Complete(header_len)) => {
221 fields.insert("is_request", FieldValue::Bool(true));
222
223 if let Some(method) = req.method {
224 fields.insert(
225 "method",
226 FieldValue::OwnedString(CompactString::new(method)),
227 );
228 }
229
230 if let Some(path) = req.path {
231 fields.insert("uri", FieldValue::OwnedString(CompactString::new(path)));
232 }
233
234 if let Some(version) = req.version {
235 let version_str = format!("HTTP/1.{version}");
236 fields.insert(
237 "http_version",
238 FieldValue::OwnedString(CompactString::new(version_str)),
239 );
240 }
241
242 Self::extract_headers(&headers, &mut fields);
243
244 let body_start = header_len;
246 let body_length = if Self::is_chunked(&fields) {
247 match Self::parse_chunked_body(&data[body_start..]) {
248 Some(len) => len,
249 None => {
250 return StreamParseResult::NeedMore {
251 minimum_bytes: None,
252 }
253 }
254 }
255 } else if let Some(content_length) = Self::get_content_length(&fields) {
256 let needed = body_start + content_length;
257 if data.len() < needed {
258 return StreamParseResult::NeedMore {
259 minimum_bytes: Some(needed),
260 };
261 }
262 content_length
263 } else {
264 0
266 };
267
268 let total_length = body_start + body_length;
269
270 let message = ParsedMessage {
271 protocol: "http",
272 connection_id: context.connection_id,
273 message_id: context.messages_parsed as u32,
274 direction: context.direction,
275 frame_number: 0,
276 fields,
277 };
278
279 return StreamParseResult::Complete {
280 messages: vec![message],
281 bytes_consumed: total_length,
282 };
283 }
284 Ok(Status::Partial) => {
285 return StreamParseResult::NeedMore {
287 minimum_bytes: None,
288 };
289 }
290 Err(_) => {
291 }
293 }
294
295 let mut headers = [EMPTY_HEADER; MAX_HEADERS];
297 let mut resp = Response::new(&mut headers);
298
299 match resp.parse(data) {
300 Ok(Status::Complete(header_len)) => {
301 fields.insert("is_request", FieldValue::Bool(false));
302
303 if let Some(version) = resp.version {
304 let version_str = format!("HTTP/1.{version}");
305 fields.insert(
306 "http_version",
307 FieldValue::OwnedString(CompactString::new(version_str)),
308 );
309 }
310
311 if let Some(code) = resp.code {
312 fields.insert("status_code", FieldValue::UInt16(code));
313 }
314
315 if let Some(reason) = resp.reason {
316 fields.insert(
317 "status_text",
318 FieldValue::OwnedString(CompactString::new(reason)),
319 );
320 }
321
322 Self::extract_headers(&headers, &mut fields);
323
324 let body_start = header_len;
326 let body_length = if Self::is_chunked(&fields) {
327 match Self::parse_chunked_body(&data[body_start..]) {
328 Some(len) => len,
329 None => {
330 return StreamParseResult::NeedMore {
331 minimum_bytes: None,
332 }
333 }
334 }
335 } else if let Some(content_length) = Self::get_content_length(&fields) {
336 let needed = body_start + content_length;
337 if data.len() < needed {
338 return StreamParseResult::NeedMore {
339 minimum_bytes: Some(needed),
340 };
341 }
342 content_length
343 } else {
344 0
348 };
349
350 let total_length = body_start + body_length;
351
352 let message = ParsedMessage {
353 protocol: "http",
354 connection_id: context.connection_id,
355 message_id: context.messages_parsed as u32,
356 direction: context.direction,
357 frame_number: 0,
358 fields,
359 };
360
361 return StreamParseResult::Complete {
362 messages: vec![message],
363 bytes_consumed: total_length,
364 };
365 }
366 Ok(Status::Partial) => {
367 return StreamParseResult::NeedMore {
368 minimum_bytes: None,
369 };
370 }
371 Err(_) => {
372 if data.starts_with(b"GET ")
374 || data.starts_with(b"POST ")
375 || data.starts_with(b"PUT ")
376 || data.starts_with(b"DELETE ")
377 || data.starts_with(b"HEAD ")
378 || data.starts_with(b"OPTIONS ")
379 || data.starts_with(b"PATCH ")
380 || data.starts_with(b"CONNECT ")
381 || data.starts_with(b"HTTP/")
382 {
383 return StreamParseResult::NeedMore {
385 minimum_bytes: None,
386 };
387 }
388 }
389 }
390
391 StreamParseResult::NotThisProtocol
393 }
394
395 fn message_schema(&self) -> Vec<FieldDescriptor> {
396 vec![
397 FieldDescriptor::new("connection_id", DataKind::UInt64),
398 FieldDescriptor::new("message_id", DataKind::UInt32),
399 FieldDescriptor::new("direction", DataKind::String).set_nullable(true),
400 FieldDescriptor::new("is_request", DataKind::Bool).set_nullable(true),
401 FieldDescriptor::new("method", DataKind::String).set_nullable(true),
402 FieldDescriptor::new("uri", DataKind::String).set_nullable(true),
403 FieldDescriptor::new("http_version", DataKind::String).set_nullable(true),
404 FieldDescriptor::new("status_code", DataKind::UInt16).set_nullable(true),
405 FieldDescriptor::new("status_text", DataKind::String).set_nullable(true),
406 FieldDescriptor::new("host", DataKind::String).set_nullable(true),
407 FieldDescriptor::new("content_type", DataKind::String).set_nullable(true),
408 FieldDescriptor::new("content_length", DataKind::UInt64).set_nullable(true),
409 FieldDescriptor::new("user_agent", DataKind::String).set_nullable(true),
410 FieldDescriptor::new("server", DataKind::String).set_nullable(true),
411 FieldDescriptor::new("transfer_encoding", DataKind::String).set_nullable(true),
412 FieldDescriptor::new("connection", DataKind::String).set_nullable(true),
413 FieldDescriptor::new("cookie", DataKind::String).set_nullable(true),
414 FieldDescriptor::new("set_cookie", DataKind::String).set_nullable(true),
415 FieldDescriptor::new("referer", DataKind::String).set_nullable(true),
416 FieldDescriptor::new("accept", DataKind::String).set_nullable(true),
417 FieldDescriptor::new("accept_encoding", DataKind::String).set_nullable(true),
418 FieldDescriptor::new("accept_language", DataKind::String).set_nullable(true),
419 FieldDescriptor::new("cache_control", DataKind::String).set_nullable(true),
420 FieldDescriptor::new("authorization", DataKind::String).set_nullable(true),
421 FieldDescriptor::new("location", DataKind::String).set_nullable(true),
422 FieldDescriptor::new("x_forwarded_for", DataKind::String).set_nullable(true),
423 FieldDescriptor::new("x_real_ip", DataKind::String).set_nullable(true),
424 ]
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::stream::Direction;
432 use std::net::Ipv4Addr;
433
434 fn test_context() -> StreamContext {
435 StreamContext {
436 connection_id: 1,
437 direction: Direction::ToServer,
438 src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
439 dst_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
440 src_port: 54321,
441 dst_port: 80,
442 bytes_parsed: 0,
443 messages_parsed: 0,
444 alpn: None,
445 }
446 }
447
448 #[test]
449 fn test_simple_get_request() {
450 let parser = HttpStreamParser::new();
451 let data = b"GET /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n";
452
453 let result = parser.parse_stream(data, &test_context());
454
455 match result {
456 StreamParseResult::Complete {
457 messages,
458 bytes_consumed,
459 } => {
460 assert_eq!(bytes_consumed, data.len());
461 assert_eq!(messages.len(), 1);
462 let msg = &messages[0];
463 assert_eq!(
464 msg.fields.get("method"),
465 Some(&FieldValue::OwnedString(CompactString::new("GET")))
466 );
467 assert_eq!(
468 msg.fields.get("uri"),
469 Some(&FieldValue::OwnedString(CompactString::new("/index.html")))
470 );
471 }
472 _ => panic!("Expected Complete"),
473 }
474 }
475
476 #[test]
477 fn test_post_with_body() {
478 let parser = HttpStreamParser::new();
479 let body = r#"{"key": "value"}"#;
480 let request = format!(
481 "POST /api HTTP/1.1\r\nHost: api.example.com\r\nContent-Length: {}\r\n\r\n{}",
482 body.len(),
483 body
484 );
485
486 let result = parser.parse_stream(request.as_bytes(), &test_context());
487
488 match result {
489 StreamParseResult::Complete {
490 messages,
491 bytes_consumed,
492 } => {
493 assert_eq!(bytes_consumed, request.len());
494 assert_eq!(
495 messages[0].fields.get("method"),
496 Some(&FieldValue::OwnedString(CompactString::new("POST")))
497 );
498 }
499 _ => panic!("Expected Complete"),
500 }
501 }
502
503 #[test]
504 fn test_response_with_content_length() {
505 let parser = HttpStreamParser::new();
506 let body = "<html>Hello</html>";
507 let response = format!(
508 "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n{}",
509 body.len(),
510 body
511 );
512
513 let mut ctx = test_context();
514 ctx.direction = Direction::ToClient;
515
516 let result = parser.parse_stream(response.as_bytes(), &ctx);
517
518 match result {
519 StreamParseResult::Complete { messages, .. } => {
520 assert_eq!(
521 messages[0].fields.get("status_code"),
522 Some(&FieldValue::UInt16(200))
523 );
524 assert_eq!(
525 messages[0].fields.get("is_request"),
526 Some(&FieldValue::Bool(false))
527 );
528 }
529 _ => panic!("Expected Complete"),
530 }
531 }
532
533 #[test]
534 fn test_chunked_encoding() {
535 let parser = HttpStreamParser::new();
536 let response =
537 "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nHello\r\n0\r\n\r\n";
538
539 let mut ctx = test_context();
540 ctx.direction = Direction::ToClient;
541
542 let result = parser.parse_stream(response.as_bytes(), &ctx);
543
544 match result {
545 StreamParseResult::Complete { bytes_consumed, .. } => {
546 assert_eq!(bytes_consumed, response.len());
547 }
548 _ => panic!("Expected Complete"),
549 }
550 }
551
552 #[test]
553 fn test_chunked_with_extensions() {
554 let parser = HttpStreamParser::new();
555 let response =
557 "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5;name=value\r\nHello\r\n0\r\n\r\n";
558
559 let mut ctx = test_context();
560 ctx.direction = Direction::ToClient;
561
562 let result = parser.parse_stream(response.as_bytes(), &ctx);
563
564 match result {
565 StreamParseResult::Complete { bytes_consumed, .. } => {
566 assert_eq!(bytes_consumed, response.len());
567 }
568 _ => panic!("Expected Complete"),
569 }
570 }
571
572 #[test]
573 fn test_keepalive_multiple_requests() {
574 let parser = HttpStreamParser::new();
575 let requests = "GET /page1 HTTP/1.1\r\nHost: example.com\r\n\r\nGET /page2 HTTP/1.1\r\nHost: example.com\r\n\r\n";
576
577 let result = parser.parse_stream(requests.as_bytes(), &test_context());
579 match result {
580 StreamParseResult::Complete {
581 messages,
582 bytes_consumed,
583 } => {
584 assert_eq!(
585 messages[0].fields.get("uri"),
586 Some(&FieldValue::OwnedString(CompactString::new("/page1")))
587 );
588
589 let result2 =
591 parser.parse_stream(&requests.as_bytes()[bytes_consumed..], &test_context());
592 match result2 {
593 StreamParseResult::Complete {
594 messages: msgs2, ..
595 } => {
596 assert_eq!(
597 msgs2[0].fields.get("uri"),
598 Some(&FieldValue::OwnedString(CompactString::new("/page2")))
599 );
600 }
601 _ => panic!("Expected Complete for second request"),
602 }
603 }
604 _ => panic!("Expected Complete"),
605 }
606 }
607
608 #[test]
609 fn test_incomplete_header() {
610 let parser = HttpStreamParser::new();
611 let partial = b"GET /index.html HTTP/1.1\r\nHost: exam";
612
613 let result = parser.parse_stream(partial, &test_context());
614
615 match result {
616 StreamParseResult::NeedMore { .. } => {}
617 _ => panic!("Expected NeedMore"),
618 }
619 }
620
621 #[test]
622 fn test_incomplete_body() {
623 let parser = HttpStreamParser::new();
624 let partial = "POST /api HTTP/1.1\r\nContent-Length: 100\r\n\r\npartial";
625
626 let result = parser.parse_stream(partial.as_bytes(), &test_context());
627
628 match result {
629 StreamParseResult::NeedMore { minimum_bytes } => {
630 assert!(minimum_bytes.is_some());
632 }
633 _ => panic!("Expected NeedMore"),
634 }
635 }
636
637 #[test]
638 fn test_not_http() {
639 let parser = HttpStreamParser::new();
640 let garbage = b"NOTHTTP random garbage data\x00\x01\x02";
641
642 let result = parser.parse_stream(garbage, &test_context());
643
644 match result {
645 StreamParseResult::NotThisProtocol => {}
646 _ => panic!("Expected NotThisProtocol"),
647 }
648 }
649
650 #[test]
651 fn test_response_with_all_headers() {
652 let parser = HttpStreamParser::new();
653 let response = "HTTP/1.1 302 Found\r\n\
654 Server: nginx/1.18.0\r\n\
655 Content-Type: text/html\r\n\
656 Content-Length: 0\r\n\
657 Location: https://example.com/new\r\n\
658 Set-Cookie: session=abc123; HttpOnly\r\n\
659 Cache-Control: no-cache\r\n\
660 \r\n";
661
662 let mut ctx = test_context();
663 ctx.direction = Direction::ToClient;
664
665 let result = parser.parse_stream(response.as_bytes(), &ctx);
666
667 match result {
668 StreamParseResult::Complete { messages, .. } => {
669 let msg = &messages[0];
670 assert_eq!(
671 msg.fields.get("status_code"),
672 Some(&FieldValue::UInt16(302))
673 );
674 assert_eq!(
675 msg.fields.get("location"),
676 Some(&FieldValue::OwnedString(CompactString::new(
677 "https://example.com/new"
678 )))
679 );
680 assert!(msg.fields.get("set_cookie").is_some());
681 assert_eq!(
682 msg.fields.get("cache_control"),
683 Some(&FieldValue::OwnedString(CompactString::new("no-cache")))
684 );
685 }
686 _ => panic!("Expected Complete"),
687 }
688 }
689
690 #[test]
691 fn test_request_with_cookie() {
692 let parser = HttpStreamParser::new();
693 let request = "GET /api HTTP/1.1\r\n\
694 Host: api.example.com\r\n\
695 Cookie: session=xyz789; user=john\r\n\
696 Authorization: Bearer token123\r\n\
697 X-Forwarded-For: 10.0.0.1\r\n\
698 \r\n";
699
700 let result = parser.parse_stream(request.as_bytes(), &test_context());
701
702 match result {
703 StreamParseResult::Complete { messages, .. } => {
704 let msg = &messages[0];
705 assert_eq!(
706 msg.fields.get("cookie"),
707 Some(&FieldValue::OwnedString(CompactString::new(
708 "session=xyz789; user=john"
709 )))
710 );
711 assert_eq!(
713 msg.fields.get("authorization"),
714 Some(&FieldValue::OwnedString(CompactString::new("Bearer")))
715 );
716 assert_eq!(
717 msg.fields.get("x_forwarded_for"),
718 Some(&FieldValue::OwnedString(CompactString::new("10.0.0.1")))
719 );
720 }
721 _ => panic!("Expected Complete"),
722 }
723 }
724
725 #[test]
726 fn test_http10_request() {
727 let parser = HttpStreamParser::new();
728 let request = "GET / HTTP/1.0\r\n\r\n";
729
730 let result = parser.parse_stream(request.as_bytes(), &test_context());
731
732 match result {
733 StreamParseResult::Complete { messages, .. } => {
734 assert_eq!(
735 messages[0].fields.get("http_version"),
736 Some(&FieldValue::OwnedString(CompactString::new("HTTP/1.0")))
737 );
738 }
739 _ => panic!("Expected Complete"),
740 }
741 }
742
743 #[test]
744 fn test_head_request() {
745 let parser = HttpStreamParser::new();
746 let request = "HEAD /status HTTP/1.1\r\nHost: example.com\r\n\r\n";
747
748 let result = parser.parse_stream(request.as_bytes(), &test_context());
749
750 match result {
751 StreamParseResult::Complete { messages, .. } => {
752 assert_eq!(
753 messages[0].fields.get("method"),
754 Some(&FieldValue::OwnedString(CompactString::new("HEAD")))
755 );
756 }
757 _ => panic!("Expected Complete"),
758 }
759 }
760
761 #[test]
762 fn test_options_request() {
763 let parser = HttpStreamParser::new();
764 let request = "OPTIONS * HTTP/1.1\r\nHost: example.com\r\n\r\n";
765
766 let result = parser.parse_stream(request.as_bytes(), &test_context());
767
768 match result {
769 StreamParseResult::Complete { messages, .. } => {
770 assert_eq!(
771 messages[0].fields.get("method"),
772 Some(&FieldValue::OwnedString(CompactString::new("OPTIONS")))
773 );
774 }
775 _ => panic!("Expected Complete"),
776 }
777 }
778
779 #[test]
780 fn test_100_continue_response() {
781 let parser = HttpStreamParser::new();
782 let response = "HTTP/1.1 100 Continue\r\n\r\n";
783
784 let mut ctx = test_context();
785 ctx.direction = Direction::ToClient;
786
787 let result = parser.parse_stream(response.as_bytes(), &ctx);
788
789 match result {
790 StreamParseResult::Complete { messages, .. } => {
791 assert_eq!(
792 messages[0].fields.get("status_code"),
793 Some(&FieldValue::UInt16(100))
794 );
795 }
796 _ => panic!("Expected Complete"),
797 }
798 }
799}