pcapsql_core/stream/parsers/
http.rs

1//! HTTP/1.x stream parser using httparse for zero-copy parsing.
2//!
3//! This parser handles HTTP messages that span multiple TCP packets through
4//! stream reassembly. It supports:
5//! - HTTP/1.0 and HTTP/1.1 requests and responses
6//! - Chunked transfer encoding
7//! - Content-Length based body parsing
8//! - Keep-alive connections with multiple messages per stream
9
10use std::collections::HashMap;
11
12use compact_str::CompactString;
13use httparse::{Request, Response, Status, EMPTY_HEADER};
14
15use crate::protocol::FieldValue;
16use crate::schema::{DataKind, FieldDescriptor};
17use crate::stream::{ParsedMessage, StreamContext, StreamParseResult, StreamParser};
18
19/// Maximum number of headers to parse per message.
20const MAX_HEADERS: usize = 100;
21
22/// HTTP/1.x stream parser.
23#[derive(Debug, Clone, Copy, Default)]
24pub struct HttpStreamParser;
25
26impl HttpStreamParser {
27    pub fn new() -> Self {
28        Self
29    }
30
31    /// Extract headers from httparse header array into fields map.
32    /// Uses eq_ignore_ascii_case() for zero-allocation case-insensitive matching.
33    fn extract_headers(
34        headers: &[httparse::Header],
35        fields: &mut HashMap<&'static str, FieldValue>,
36    ) {
37        for header in headers.iter().filter(|h| !h.name.is_empty()) {
38            let name = header.name;
39            let value = String::from_utf8_lossy(header.value).to_string();
40
41            if name.eq_ignore_ascii_case("host") {
42                fields.insert("host", FieldValue::OwnedString(CompactString::new(value)));
43            } else if name.eq_ignore_ascii_case("content-type") {
44                fields.insert(
45                    "content_type",
46                    FieldValue::OwnedString(CompactString::new(value)),
47                );
48            } else if name.eq_ignore_ascii_case("content-length") {
49                if let Ok(len) = value.parse::<u64>() {
50                    fields.insert("content_length", FieldValue::UInt64(len));
51                }
52            } else if name.eq_ignore_ascii_case("user-agent") {
53                fields.insert(
54                    "user_agent",
55                    FieldValue::OwnedString(CompactString::new(value)),
56                );
57            } else if name.eq_ignore_ascii_case("server") {
58                fields.insert("server", FieldValue::OwnedString(CompactString::new(value)));
59            } else if name.eq_ignore_ascii_case("transfer-encoding") {
60                fields.insert(
61                    "transfer_encoding",
62                    FieldValue::OwnedString(CompactString::new(value)),
63                );
64            } else if name.eq_ignore_ascii_case("connection") {
65                fields.insert(
66                    "connection",
67                    FieldValue::OwnedString(CompactString::new(value)),
68                );
69            } else if name.eq_ignore_ascii_case("cookie") {
70                fields.insert("cookie", FieldValue::OwnedString(CompactString::new(value)));
71            } else if name.eq_ignore_ascii_case("set-cookie") {
72                fields
73                    .entry("set_cookie")
74                    .or_insert(FieldValue::OwnedString(CompactString::new(value)));
75            } else if name.eq_ignore_ascii_case("referer") || name.eq_ignore_ascii_case("referrer")
76            {
77                fields.insert(
78                    "referer",
79                    FieldValue::OwnedString(CompactString::new(value)),
80                );
81            } else if name.eq_ignore_ascii_case("accept") {
82                fields.insert("accept", FieldValue::OwnedString(CompactString::new(value)));
83            } else if name.eq_ignore_ascii_case("accept-encoding") {
84                fields.insert(
85                    "accept_encoding",
86                    FieldValue::OwnedString(CompactString::new(value)),
87                );
88            } else if name.eq_ignore_ascii_case("accept-language") {
89                fields.insert(
90                    "accept_language",
91                    FieldValue::OwnedString(CompactString::new(value)),
92                );
93            } else if name.eq_ignore_ascii_case("cache-control") {
94                fields.insert(
95                    "cache_control",
96                    FieldValue::OwnedString(CompactString::new(value)),
97                );
98            } else if name.eq_ignore_ascii_case("authorization") {
99                // Store auth type only for security
100                let auth_type = value.split_whitespace().next().unwrap_or(&value);
101                fields.insert(
102                    "authorization",
103                    FieldValue::OwnedString(CompactString::new(auth_type)),
104                );
105            } else if name.eq_ignore_ascii_case("location") {
106                fields.insert(
107                    "location",
108                    FieldValue::OwnedString(CompactString::new(value)),
109                );
110            } else if name.eq_ignore_ascii_case("x-forwarded-for") {
111                fields.insert(
112                    "x_forwarded_for",
113                    FieldValue::OwnedString(CompactString::new(value)),
114                );
115            } else if name.eq_ignore_ascii_case("x-real-ip") {
116                fields.insert(
117                    "x_real_ip",
118                    FieldValue::OwnedString(CompactString::new(value)),
119                );
120            }
121            // Unknown headers are ignored
122        }
123    }
124
125    /// Check if transfer encoding is chunked.
126    fn is_chunked(fields: &HashMap<&'static str, FieldValue>) -> bool {
127        fields
128            .get("transfer_encoding")
129            .and_then(|v| v.as_str())
130            .map(|s| s.to_lowercase().contains("chunked"))
131            .unwrap_or(false)
132    }
133
134    /// Get Content-Length from fields.
135    fn get_content_length(fields: &HashMap<&'static str, FieldValue>) -> Option<usize> {
136        fields.get("content_length").and_then(|v| {
137            if let FieldValue::UInt64(len) = v {
138                Some(*len as usize)
139            } else {
140                None
141            }
142        })
143    }
144
145    /// Parse chunked body and return total length consumed.
146    fn parse_chunked_body(data: &[u8]) -> Option<usize> {
147        let mut pos = 0;
148
149        loop {
150            // Find chunk size line ending
151            let line_end = data[pos..]
152                .windows(2)
153                .position(|w| w == b"\r\n")
154                .map(|p| pos + p)?;
155
156            let size_str = std::str::from_utf8(&data[pos..line_end]).ok()?;
157            // Handle chunk extensions (anything after semicolon)
158            let size_part = size_str.split(';').next().unwrap_or(size_str);
159            let chunk_size = usize::from_str_radix(size_part.trim(), 16).ok()?;
160
161            pos = line_end + 2; // Skip \r\n
162
163            if chunk_size == 0 {
164                // Final chunk - need trailing \r\n (and optional trailers)
165                if data.len() >= pos + 2 && &data[pos..pos + 2] == b"\r\n" {
166                    return Some(pos + 2);
167                }
168                // Might have trailers, look for \r\n\r\n
169                if let Some(end) = data[pos..].windows(4).position(|w| w == b"\r\n\r\n") {
170                    return Some(pos + end + 4);
171                }
172                return None; // Need more data
173            }
174
175            // Need chunk data + trailing \r\n
176            if data.len() < pos + chunk_size + 2 {
177                return None;
178            }
179
180            // Verify trailing \r\n after chunk data
181            if &data[pos + chunk_size..pos + chunk_size + 2] != b"\r\n" {
182                return None; // Malformed
183            }
184
185            pos += chunk_size + 2;
186        }
187    }
188}
189
190impl StreamParser for HttpStreamParser {
191    fn name(&self) -> &'static str {
192        "http"
193    }
194
195    fn display_name(&self) -> &'static str {
196        "HTTP"
197    }
198
199    fn can_parse_stream(&self, context: &StreamContext) -> bool {
200        // Common HTTP ports
201        const HTTP_PORTS: [u16; 6] = [80, 8080, 8000, 8888, 3000, 5000];
202        HTTP_PORTS.contains(&context.dst_port) || HTTP_PORTS.contains(&context.src_port)
203    }
204
205    fn parse_stream(&self, data: &[u8], context: &StreamContext) -> StreamParseResult {
206        // Need at least a few bytes to determine message type
207        if data.len() < 16 {
208            return StreamParseResult::NeedMore {
209                minimum_bytes: Some(16),
210            };
211        }
212
213        let mut fields = HashMap::new();
214
215        // Try to parse as HTTP request first
216        let mut headers = [EMPTY_HEADER; MAX_HEADERS];
217        let mut req = Request::new(&mut headers);
218
219        match req.parse(data) {
220            Ok(Status::Complete(header_len)) => {
221                fields.insert("is_request", FieldValue::Bool(true));
222
223                if let Some(method) = req.method {
224                    fields.insert(
225                        "method",
226                        FieldValue::OwnedString(CompactString::new(method)),
227                    );
228                }
229
230                if let Some(path) = req.path {
231                    fields.insert("uri", FieldValue::OwnedString(CompactString::new(path)));
232                }
233
234                if let Some(version) = req.version {
235                    let version_str = format!("HTTP/1.{version}");
236                    fields.insert(
237                        "http_version",
238                        FieldValue::OwnedString(CompactString::new(version_str)),
239                    );
240                }
241
242                Self::extract_headers(&headers, &mut fields);
243
244                // Calculate body length
245                let body_start = header_len;
246                let body_length = if Self::is_chunked(&fields) {
247                    match Self::parse_chunked_body(&data[body_start..]) {
248                        Some(len) => len,
249                        None => {
250                            return StreamParseResult::NeedMore {
251                                minimum_bytes: None,
252                            }
253                        }
254                    }
255                } else if let Some(content_length) = Self::get_content_length(&fields) {
256                    let needed = body_start + content_length;
257                    if data.len() < needed {
258                        return StreamParseResult::NeedMore {
259                            minimum_bytes: Some(needed),
260                        };
261                    }
262                    content_length
263                } else {
264                    // No body for requests without Content-Length or chunked encoding
265                    0
266                };
267
268                let total_length = body_start + body_length;
269
270                let message = ParsedMessage {
271                    protocol: "http",
272                    connection_id: context.connection_id,
273                    message_id: context.messages_parsed as u32,
274                    direction: context.direction,
275                    frame_number: 0,
276                    fields,
277                };
278
279                return StreamParseResult::Complete {
280                    messages: vec![message],
281                    bytes_consumed: total_length,
282                };
283            }
284            Ok(Status::Partial) => {
285                // Headers not complete yet
286                return StreamParseResult::NeedMore {
287                    minimum_bytes: None,
288                };
289            }
290            Err(_) => {
291                // Not a request, try response
292            }
293        }
294
295        // Try to parse as HTTP response
296        let mut headers = [EMPTY_HEADER; MAX_HEADERS];
297        let mut resp = Response::new(&mut headers);
298
299        match resp.parse(data) {
300            Ok(Status::Complete(header_len)) => {
301                fields.insert("is_request", FieldValue::Bool(false));
302
303                if let Some(version) = resp.version {
304                    let version_str = format!("HTTP/1.{version}");
305                    fields.insert(
306                        "http_version",
307                        FieldValue::OwnedString(CompactString::new(version_str)),
308                    );
309                }
310
311                if let Some(code) = resp.code {
312                    fields.insert("status_code", FieldValue::UInt16(code));
313                }
314
315                if let Some(reason) = resp.reason {
316                    fields.insert(
317                        "status_text",
318                        FieldValue::OwnedString(CompactString::new(reason)),
319                    );
320                }
321
322                Self::extract_headers(&headers, &mut fields);
323
324                // Calculate body length
325                let body_start = header_len;
326                let body_length = if Self::is_chunked(&fields) {
327                    match Self::parse_chunked_body(&data[body_start..]) {
328                        Some(len) => len,
329                        None => {
330                            return StreamParseResult::NeedMore {
331                                minimum_bytes: None,
332                            }
333                        }
334                    }
335                } else if let Some(content_length) = Self::get_content_length(&fields) {
336                    let needed = body_start + content_length;
337                    if data.len() < needed {
338                        return StreamParseResult::NeedMore {
339                            minimum_bytes: Some(needed),
340                        };
341                    }
342                    content_length
343                } else {
344                    // For responses without Content-Length, this is tricky
345                    // In HTTP/1.0, connection close signals end of response
346                    // For now, assume no body if neither is present
347                    0
348                };
349
350                let total_length = body_start + body_length;
351
352                let message = ParsedMessage {
353                    protocol: "http",
354                    connection_id: context.connection_id,
355                    message_id: context.messages_parsed as u32,
356                    direction: context.direction,
357                    frame_number: 0,
358                    fields,
359                };
360
361                return StreamParseResult::Complete {
362                    messages: vec![message],
363                    bytes_consumed: total_length,
364                };
365            }
366            Ok(Status::Partial) => {
367                return StreamParseResult::NeedMore {
368                    minimum_bytes: None,
369                };
370            }
371            Err(_) => {
372                // Check if it looks like HTTP at all
373                if data.starts_with(b"GET ")
374                    || data.starts_with(b"POST ")
375                    || data.starts_with(b"PUT ")
376                    || data.starts_with(b"DELETE ")
377                    || data.starts_with(b"HEAD ")
378                    || data.starts_with(b"OPTIONS ")
379                    || data.starts_with(b"PATCH ")
380                    || data.starts_with(b"CONNECT ")
381                    || data.starts_with(b"HTTP/")
382                {
383                    // Looks like HTTP but parsing failed - need more data?
384                    return StreamParseResult::NeedMore {
385                        minimum_bytes: None,
386                    };
387                }
388            }
389        }
390
391        // Not HTTP
392        StreamParseResult::NotThisProtocol
393    }
394
395    fn message_schema(&self) -> Vec<FieldDescriptor> {
396        vec![
397            FieldDescriptor::new("connection_id", DataKind::UInt64),
398            FieldDescriptor::new("message_id", DataKind::UInt32),
399            FieldDescriptor::new("direction", DataKind::String).set_nullable(true),
400            FieldDescriptor::new("is_request", DataKind::Bool).set_nullable(true),
401            FieldDescriptor::new("method", DataKind::String).set_nullable(true),
402            FieldDescriptor::new("uri", DataKind::String).set_nullable(true),
403            FieldDescriptor::new("http_version", DataKind::String).set_nullable(true),
404            FieldDescriptor::new("status_code", DataKind::UInt16).set_nullable(true),
405            FieldDescriptor::new("status_text", DataKind::String).set_nullable(true),
406            FieldDescriptor::new("host", DataKind::String).set_nullable(true),
407            FieldDescriptor::new("content_type", DataKind::String).set_nullable(true),
408            FieldDescriptor::new("content_length", DataKind::UInt64).set_nullable(true),
409            FieldDescriptor::new("user_agent", DataKind::String).set_nullable(true),
410            FieldDescriptor::new("server", DataKind::String).set_nullable(true),
411            FieldDescriptor::new("transfer_encoding", DataKind::String).set_nullable(true),
412            FieldDescriptor::new("connection", DataKind::String).set_nullable(true),
413            FieldDescriptor::new("cookie", DataKind::String).set_nullable(true),
414            FieldDescriptor::new("set_cookie", DataKind::String).set_nullable(true),
415            FieldDescriptor::new("referer", DataKind::String).set_nullable(true),
416            FieldDescriptor::new("accept", DataKind::String).set_nullable(true),
417            FieldDescriptor::new("accept_encoding", DataKind::String).set_nullable(true),
418            FieldDescriptor::new("accept_language", DataKind::String).set_nullable(true),
419            FieldDescriptor::new("cache_control", DataKind::String).set_nullable(true),
420            FieldDescriptor::new("authorization", DataKind::String).set_nullable(true),
421            FieldDescriptor::new("location", DataKind::String).set_nullable(true),
422            FieldDescriptor::new("x_forwarded_for", DataKind::String).set_nullable(true),
423            FieldDescriptor::new("x_real_ip", DataKind::String).set_nullable(true),
424        ]
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::stream::Direction;
432    use std::net::Ipv4Addr;
433
434    fn test_context() -> StreamContext {
435        StreamContext {
436            connection_id: 1,
437            direction: Direction::ToServer,
438            src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
439            dst_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
440            src_port: 54321,
441            dst_port: 80,
442            bytes_parsed: 0,
443            messages_parsed: 0,
444            alpn: None,
445        }
446    }
447
448    #[test]
449    fn test_simple_get_request() {
450        let parser = HttpStreamParser::new();
451        let data = b"GET /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n";
452
453        let result = parser.parse_stream(data, &test_context());
454
455        match result {
456            StreamParseResult::Complete {
457                messages,
458                bytes_consumed,
459            } => {
460                assert_eq!(bytes_consumed, data.len());
461                assert_eq!(messages.len(), 1);
462                let msg = &messages[0];
463                assert_eq!(
464                    msg.fields.get("method"),
465                    Some(&FieldValue::OwnedString(CompactString::new("GET")))
466                );
467                assert_eq!(
468                    msg.fields.get("uri"),
469                    Some(&FieldValue::OwnedString(CompactString::new("/index.html")))
470                );
471            }
472            _ => panic!("Expected Complete"),
473        }
474    }
475
476    #[test]
477    fn test_post_with_body() {
478        let parser = HttpStreamParser::new();
479        let body = r#"{"key": "value"}"#;
480        let request = format!(
481            "POST /api HTTP/1.1\r\nHost: api.example.com\r\nContent-Length: {}\r\n\r\n{}",
482            body.len(),
483            body
484        );
485
486        let result = parser.parse_stream(request.as_bytes(), &test_context());
487
488        match result {
489            StreamParseResult::Complete {
490                messages,
491                bytes_consumed,
492            } => {
493                assert_eq!(bytes_consumed, request.len());
494                assert_eq!(
495                    messages[0].fields.get("method"),
496                    Some(&FieldValue::OwnedString(CompactString::new("POST")))
497                );
498            }
499            _ => panic!("Expected Complete"),
500        }
501    }
502
503    #[test]
504    fn test_response_with_content_length() {
505        let parser = HttpStreamParser::new();
506        let body = "<html>Hello</html>";
507        let response = format!(
508            "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n{}",
509            body.len(),
510            body
511        );
512
513        let mut ctx = test_context();
514        ctx.direction = Direction::ToClient;
515
516        let result = parser.parse_stream(response.as_bytes(), &ctx);
517
518        match result {
519            StreamParseResult::Complete { messages, .. } => {
520                assert_eq!(
521                    messages[0].fields.get("status_code"),
522                    Some(&FieldValue::UInt16(200))
523                );
524                assert_eq!(
525                    messages[0].fields.get("is_request"),
526                    Some(&FieldValue::Bool(false))
527                );
528            }
529            _ => panic!("Expected Complete"),
530        }
531    }
532
533    #[test]
534    fn test_chunked_encoding() {
535        let parser = HttpStreamParser::new();
536        let response =
537            "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nHello\r\n0\r\n\r\n";
538
539        let mut ctx = test_context();
540        ctx.direction = Direction::ToClient;
541
542        let result = parser.parse_stream(response.as_bytes(), &ctx);
543
544        match result {
545            StreamParseResult::Complete { bytes_consumed, .. } => {
546                assert_eq!(bytes_consumed, response.len());
547            }
548            _ => panic!("Expected Complete"),
549        }
550    }
551
552    #[test]
553    fn test_chunked_with_extensions() {
554        let parser = HttpStreamParser::new();
555        // Chunked encoding with chunk extension (name=value after size)
556        let response =
557            "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5;name=value\r\nHello\r\n0\r\n\r\n";
558
559        let mut ctx = test_context();
560        ctx.direction = Direction::ToClient;
561
562        let result = parser.parse_stream(response.as_bytes(), &ctx);
563
564        match result {
565            StreamParseResult::Complete { bytes_consumed, .. } => {
566                assert_eq!(bytes_consumed, response.len());
567            }
568            _ => panic!("Expected Complete"),
569        }
570    }
571
572    #[test]
573    fn test_keepalive_multiple_requests() {
574        let parser = HttpStreamParser::new();
575        let requests = "GET /page1 HTTP/1.1\r\nHost: example.com\r\n\r\nGET /page2 HTTP/1.1\r\nHost: example.com\r\n\r\n";
576
577        // First request
578        let result = parser.parse_stream(requests.as_bytes(), &test_context());
579        match result {
580            StreamParseResult::Complete {
581                messages,
582                bytes_consumed,
583            } => {
584                assert_eq!(
585                    messages[0].fields.get("uri"),
586                    Some(&FieldValue::OwnedString(CompactString::new("/page1")))
587                );
588
589                // Second request (remaining bytes)
590                let result2 =
591                    parser.parse_stream(&requests.as_bytes()[bytes_consumed..], &test_context());
592                match result2 {
593                    StreamParseResult::Complete {
594                        messages: msgs2, ..
595                    } => {
596                        assert_eq!(
597                            msgs2[0].fields.get("uri"),
598                            Some(&FieldValue::OwnedString(CompactString::new("/page2")))
599                        );
600                    }
601                    _ => panic!("Expected Complete for second request"),
602                }
603            }
604            _ => panic!("Expected Complete"),
605        }
606    }
607
608    #[test]
609    fn test_incomplete_header() {
610        let parser = HttpStreamParser::new();
611        let partial = b"GET /index.html HTTP/1.1\r\nHost: exam";
612
613        let result = parser.parse_stream(partial, &test_context());
614
615        match result {
616            StreamParseResult::NeedMore { .. } => {}
617            _ => panic!("Expected NeedMore"),
618        }
619    }
620
621    #[test]
622    fn test_incomplete_body() {
623        let parser = HttpStreamParser::new();
624        let partial = "POST /api HTTP/1.1\r\nContent-Length: 100\r\n\r\npartial";
625
626        let result = parser.parse_stream(partial.as_bytes(), &test_context());
627
628        match result {
629            StreamParseResult::NeedMore { minimum_bytes } => {
630                // Should need headers + 100 bytes
631                assert!(minimum_bytes.is_some());
632            }
633            _ => panic!("Expected NeedMore"),
634        }
635    }
636
637    #[test]
638    fn test_not_http() {
639        let parser = HttpStreamParser::new();
640        let garbage = b"NOTHTTP random garbage data\x00\x01\x02";
641
642        let result = parser.parse_stream(garbage, &test_context());
643
644        match result {
645            StreamParseResult::NotThisProtocol => {}
646            _ => panic!("Expected NotThisProtocol"),
647        }
648    }
649
650    #[test]
651    fn test_response_with_all_headers() {
652        let parser = HttpStreamParser::new();
653        let response = "HTTP/1.1 302 Found\r\n\
654            Server: nginx/1.18.0\r\n\
655            Content-Type: text/html\r\n\
656            Content-Length: 0\r\n\
657            Location: https://example.com/new\r\n\
658            Set-Cookie: session=abc123; HttpOnly\r\n\
659            Cache-Control: no-cache\r\n\
660            \r\n";
661
662        let mut ctx = test_context();
663        ctx.direction = Direction::ToClient;
664
665        let result = parser.parse_stream(response.as_bytes(), &ctx);
666
667        match result {
668            StreamParseResult::Complete { messages, .. } => {
669                let msg = &messages[0];
670                assert_eq!(
671                    msg.fields.get("status_code"),
672                    Some(&FieldValue::UInt16(302))
673                );
674                assert_eq!(
675                    msg.fields.get("location"),
676                    Some(&FieldValue::OwnedString(CompactString::new(
677                        "https://example.com/new"
678                    )))
679                );
680                assert!(msg.fields.get("set_cookie").is_some());
681                assert_eq!(
682                    msg.fields.get("cache_control"),
683                    Some(&FieldValue::OwnedString(CompactString::new("no-cache")))
684                );
685            }
686            _ => panic!("Expected Complete"),
687        }
688    }
689
690    #[test]
691    fn test_request_with_cookie() {
692        let parser = HttpStreamParser::new();
693        let request = "GET /api HTTP/1.1\r\n\
694            Host: api.example.com\r\n\
695            Cookie: session=xyz789; user=john\r\n\
696            Authorization: Bearer token123\r\n\
697            X-Forwarded-For: 10.0.0.1\r\n\
698            \r\n";
699
700        let result = parser.parse_stream(request.as_bytes(), &test_context());
701
702        match result {
703            StreamParseResult::Complete { messages, .. } => {
704                let msg = &messages[0];
705                assert_eq!(
706                    msg.fields.get("cookie"),
707                    Some(&FieldValue::OwnedString(CompactString::new(
708                        "session=xyz789; user=john"
709                    )))
710                );
711                // Auth should only contain the type
712                assert_eq!(
713                    msg.fields.get("authorization"),
714                    Some(&FieldValue::OwnedString(CompactString::new("Bearer")))
715                );
716                assert_eq!(
717                    msg.fields.get("x_forwarded_for"),
718                    Some(&FieldValue::OwnedString(CompactString::new("10.0.0.1")))
719                );
720            }
721            _ => panic!("Expected Complete"),
722        }
723    }
724
725    #[test]
726    fn test_http10_request() {
727        let parser = HttpStreamParser::new();
728        let request = "GET / HTTP/1.0\r\n\r\n";
729
730        let result = parser.parse_stream(request.as_bytes(), &test_context());
731
732        match result {
733            StreamParseResult::Complete { messages, .. } => {
734                assert_eq!(
735                    messages[0].fields.get("http_version"),
736                    Some(&FieldValue::OwnedString(CompactString::new("HTTP/1.0")))
737                );
738            }
739            _ => panic!("Expected Complete"),
740        }
741    }
742
743    #[test]
744    fn test_head_request() {
745        let parser = HttpStreamParser::new();
746        let request = "HEAD /status HTTP/1.1\r\nHost: example.com\r\n\r\n";
747
748        let result = parser.parse_stream(request.as_bytes(), &test_context());
749
750        match result {
751            StreamParseResult::Complete { messages, .. } => {
752                assert_eq!(
753                    messages[0].fields.get("method"),
754                    Some(&FieldValue::OwnedString(CompactString::new("HEAD")))
755                );
756            }
757            _ => panic!("Expected Complete"),
758        }
759    }
760
761    #[test]
762    fn test_options_request() {
763        let parser = HttpStreamParser::new();
764        let request = "OPTIONS * HTTP/1.1\r\nHost: example.com\r\n\r\n";
765
766        let result = parser.parse_stream(request.as_bytes(), &test_context());
767
768        match result {
769            StreamParseResult::Complete { messages, .. } => {
770                assert_eq!(
771                    messages[0].fields.get("method"),
772                    Some(&FieldValue::OwnedString(CompactString::new("OPTIONS")))
773                );
774            }
775            _ => panic!("Expected Complete"),
776        }
777    }
778
779    #[test]
780    fn test_100_continue_response() {
781        let parser = HttpStreamParser::new();
782        let response = "HTTP/1.1 100 Continue\r\n\r\n";
783
784        let mut ctx = test_context();
785        ctx.direction = Direction::ToClient;
786
787        let result = parser.parse_stream(response.as_bytes(), &ctx);
788
789        match result {
790            StreamParseResult::Complete { messages, .. } => {
791                assert_eq!(
792                    messages[0].fields.get("status_code"),
793                    Some(&FieldValue::UInt16(100))
794                );
795            }
796            _ => panic!("Expected Complete"),
797        }
798    }
799}