http_pack/
lib.rs

1use bytes::{Buf, Bytes, BytesMut};
2use http::{
3    HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri, Version,
4};
5
6const MAGIC: [u8; 4] = *b"HPK1";
7const FORMAT_VERSION: u8 = 1;
8const KIND_REQUEST: u8 = 1;
9const KIND_RESPONSE: u8 = 2;
10const MAX_HEADERS: u64 = 8192;
11
12#[cfg(feature = "h1")]
13pub mod h1;
14
15#[cfg(feature = "body")]
16pub mod body;
17
18#[cfg(feature = "h3")]
19pub mod h3;
20
21pub mod stream;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum HttpVersion {
25    Http11,
26    H2,
27    H3,
28}
29
30impl HttpVersion {
31    fn from_http(version: Version) -> Result<Self, EncodeError> {
32        match version {
33            Version::HTTP_11 => Ok(Self::Http11),
34            Version::HTTP_2 => Ok(Self::H2),
35            Version::HTTP_3 => Ok(Self::H3),
36            other => Err(EncodeError::UnsupportedHttpVersion(other)),
37        }
38    }
39
40    fn from_byte(byte: u8) -> Result<Self, DecodeError> {
41        match byte {
42            1 => Ok(Self::Http11),
43            2 => Ok(Self::H2),
44            3 => Ok(Self::H3),
45            other => Err(DecodeError::UnsupportedHttpVersion(other)),
46        }
47    }
48
49    fn to_byte(self) -> u8 {
50        match self {
51            Self::Http11 => 1,
52            Self::H2 => 2,
53            Self::H3 => 3,
54        }
55    }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct HeaderField {
60    pub name: Vec<u8>,
61    pub value: Vec<u8>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct PackedRequest {
66    pub version: HttpVersion,
67    pub method: Vec<u8>,
68    pub scheme: Option<Vec<u8>>,
69    pub authority: Option<Vec<u8>>,
70    pub path: Vec<u8>,
71    pub headers: Vec<HeaderField>,
72    pub body: Vec<u8>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct PackedResponse {
77    pub version: HttpVersion,
78    pub status: u16,
79    pub headers: Vec<HeaderField>,
80    pub body: Vec<u8>,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum PackedMessage {
85    Request(PackedRequest),
86    Response(PackedResponse),
87}
88
89#[derive(Debug)]
90pub enum EncodeError {
91    UnsupportedHttpVersion(Version),
92}
93
94impl std::fmt::Display for EncodeError {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            EncodeError::UnsupportedHttpVersion(version) => {
98                write!(f, "unsupported http version: {:?}", version)
99            }
100        }
101    }
102}
103
104impl std::error::Error for EncodeError {}
105
106#[derive(Debug)]
107pub enum DecodeError {
108    Incomplete,
109    InvalidMagic,
110    UnsupportedFormatVersion(u8),
111    UnsupportedHttpVersion(u8),
112    InvalidKind(u8),
113    InvalidVarint,
114    LengthOverflow,
115    TooManyHeaders(u64),
116    InvalidMethod,
117    InvalidPath,
118    InvalidHeaderName,
119    InvalidHeaderValue,
120    InvalidStatus,
121    TrailingBytes(usize),
122    UnexpectedMessageKind,
123}
124
125impl std::fmt::Display for DecodeError {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        match self {
128            DecodeError::Incomplete => write!(f, "incomplete payload"),
129            DecodeError::InvalidMagic => write!(f, "invalid magic"),
130            DecodeError::UnsupportedFormatVersion(version) => {
131                write!(f, "unsupported format version: {}", version)
132            }
133            DecodeError::UnsupportedHttpVersion(version) => {
134                write!(f, "unsupported http version: {}", version)
135            }
136            DecodeError::InvalidKind(kind) => write!(f, "invalid message kind: {}", kind),
137            DecodeError::InvalidVarint => write!(f, "invalid varint"),
138            DecodeError::LengthOverflow => write!(f, "length overflow"),
139            DecodeError::TooManyHeaders(count) => write!(f, "too many headers: {}", count),
140            DecodeError::InvalidMethod => write!(f, "invalid method"),
141            DecodeError::InvalidPath => write!(f, "invalid path"),
142            DecodeError::InvalidHeaderName => write!(f, "invalid header name"),
143            DecodeError::InvalidHeaderValue => write!(f, "invalid header value"),
144            DecodeError::InvalidStatus => write!(f, "invalid status"),
145            DecodeError::TrailingBytes(remaining) => {
146                write!(f, "trailing bytes: {}", remaining)
147            }
148            DecodeError::UnexpectedMessageKind => write!(f, "unexpected message kind"),
149        }
150    }
151}
152
153impl std::error::Error for DecodeError {}
154
155impl PackedRequest {
156    pub fn from_request<B: AsRef<[u8]>>(req: &Request<B>) -> Result<Self, EncodeError> {
157        let version = HttpVersion::from_http(req.version())?;
158        let method = req.method().as_str().as_bytes().to_vec();
159
160        let uri = req.uri();
161        let scheme = uri.scheme_str().map(|s| s.as_bytes().to_vec());
162        let authority = uri
163            .authority()
164            .map(|a| a.as_str().as_bytes().to_vec())
165            .or_else(|| req.headers().get("host").map(|v| v.as_bytes().to_vec()));
166        let path = uri
167            .path_and_query()
168            .map(|pq| pq.as_str())
169            .unwrap_or("/");
170        let path = if path.is_empty() { "/" } else { path };
171        let headers = collect_headers(req.headers());
172        let body = req.body().as_ref().to_vec();
173
174        Ok(Self {
175            version,
176            method,
177            scheme,
178            authority,
179            path: path.as_bytes().to_vec(),
180            headers,
181            body,
182        })
183    }
184
185    pub fn to_http1_bytes(&self) -> Result<Vec<u8>, DecodeError> {
186        validate_method(&self.method)?;
187        validate_path(&self.path)?;
188
189        let mut out = Vec::new();
190        out.extend_from_slice(&self.method);
191        out.extend_from_slice(b" ");
192        out.extend_from_slice(&self.path);
193        out.extend_from_slice(b" HTTP/1.1\r\n");
194
195        let mut has_host = false;
196        let mut has_content_length = false;
197
198        for header in &self.headers {
199            if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
200                continue;
201            }
202            if eq_ignore_ascii_case(&header.name, b"host") {
203                has_host = true;
204            }
205            if eq_ignore_ascii_case(&header.name, b"content-length") {
206                has_content_length = true;
207            }
208            validate_header_field(header)?;
209            out.extend_from_slice(&header.name);
210            out.extend_from_slice(b": ");
211            out.extend_from_slice(&header.value);
212            out.extend_from_slice(b"\r\n");
213        }
214
215        if !has_host {
216            if let Some(authority) = &self.authority {
217                if has_crlf(authority) {
218                    return Err(DecodeError::InvalidHeaderValue);
219                }
220                out.extend_from_slice(b"host: ");
221                out.extend_from_slice(authority);
222                out.extend_from_slice(b"\r\n");
223            }
224        }
225
226        if !has_content_length {
227            let len = self.body.len().to_string();
228            out.extend_from_slice(b"content-length: ");
229            out.extend_from_slice(len.as_bytes());
230            out.extend_from_slice(b"\r\n");
231        }
232
233        out.extend_from_slice(b"\r\n");
234        out.extend_from_slice(&self.body);
235        Ok(out)
236    }
237
238    pub fn into_http1_request(self) -> Result<Request<Bytes>, DecodeError> {
239        validate_method(&self.method)?;
240        let method = Method::from_bytes(&self.method).map_err(|_| DecodeError::InvalidMethod)?;
241        let path = if self.path.is_empty() {
242            b"/".as_slice()
243        } else {
244            self.path.as_slice()
245        };
246        let path_str = std::str::from_utf8(path).map_err(|_| DecodeError::InvalidPath)?;
247        let uri = path_str.parse::<Uri>().map_err(|_| DecodeError::InvalidPath)?;
248
249        let mut builder = Request::builder().method(method).uri(uri).version(Version::HTTP_11);
250        let mut has_host = false;
251        let mut has_content_length = false;
252        for header in &self.headers {
253            if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
254                continue;
255            }
256            let name = HeaderName::from_bytes(&header.name)
257                .map_err(|_| DecodeError::InvalidHeaderName)?;
258            let value = HeaderValue::from_bytes(&header.value)
259                .map_err(|_| DecodeError::InvalidHeaderValue)?;
260            if eq_ignore_ascii_case(&header.name, b"host") {
261                has_host = true;
262            }
263            if eq_ignore_ascii_case(&header.name, b"content-length") {
264                has_content_length = true;
265            }
266            builder = builder.header(name, value);
267        }
268
269        if !has_host {
270            if let Some(authority) = &self.authority {
271                let value = HeaderValue::from_bytes(authority)
272                    .map_err(|_| DecodeError::InvalidHeaderValue)?;
273                builder = builder.header("host", value);
274            }
275        }
276
277        if !has_content_length {
278            let len = self.body.len().to_string();
279            builder = builder.header("content-length", len);
280        }
281
282        builder
283            .body(Bytes::from(self.body))
284            .map_err(|_| DecodeError::InvalidPath)
285    }
286}
287
288impl PackedResponse {
289    pub fn from_response<B: AsRef<[u8]>>(resp: &Response<B>) -> Result<Self, EncodeError> {
290        let version = HttpVersion::from_http(resp.version())?;
291        let status = resp.status().as_u16();
292        let headers = collect_headers(resp.headers());
293        let body = resp.body().as_ref().to_vec();
294
295        Ok(Self {
296            version,
297            status,
298            headers,
299            body,
300        })
301    }
302
303    pub fn to_http1_bytes(&self) -> Result<Vec<u8>, DecodeError> {
304        let status = StatusCode::from_u16(self.status).map_err(|_| DecodeError::InvalidStatus)?;
305        let reason = status.canonical_reason().unwrap_or("");
306
307        let mut out = Vec::new();
308        out.extend_from_slice(b"HTTP/1.1 ");
309        out.extend_from_slice(status.as_str().as_bytes());
310        if !reason.is_empty() {
311            out.extend_from_slice(b" ");
312            out.extend_from_slice(reason.as_bytes());
313        }
314        out.extend_from_slice(b"\r\n");
315
316        let mut has_content_length = false;
317        for header in &self.headers {
318            if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
319                continue;
320            }
321            if eq_ignore_ascii_case(&header.name, b"content-length") {
322                has_content_length = true;
323            }
324            validate_header_field(header)?;
325            out.extend_from_slice(&header.name);
326            out.extend_from_slice(b": ");
327            out.extend_from_slice(&header.value);
328            out.extend_from_slice(b"\r\n");
329        }
330
331        if !has_content_length {
332            let len = self.body.len().to_string();
333            out.extend_from_slice(b"content-length: ");
334            out.extend_from_slice(len.as_bytes());
335            out.extend_from_slice(b"\r\n");
336        }
337
338        out.extend_from_slice(b"\r\n");
339        out.extend_from_slice(&self.body);
340        Ok(out)
341    }
342
343    pub fn into_http1_response(self) -> Result<Response<Bytes>, DecodeError> {
344        let status = StatusCode::from_u16(self.status).map_err(|_| DecodeError::InvalidStatus)?;
345        let mut builder = Response::builder().status(status).version(Version::HTTP_11);
346        let mut has_content_length = false;
347        for header in &self.headers {
348            if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
349                continue;
350            }
351            let name = HeaderName::from_bytes(&header.name)
352                .map_err(|_| DecodeError::InvalidHeaderName)?;
353            let value = HeaderValue::from_bytes(&header.value)
354                .map_err(|_| DecodeError::InvalidHeaderValue)?;
355            if eq_ignore_ascii_case(&header.name, b"content-length") {
356                has_content_length = true;
357            }
358            builder = builder.header(name, value);
359        }
360
361        if !has_content_length {
362            let len = self.body.len().to_string();
363            builder = builder.header("content-length", len);
364        }
365
366        builder
367            .body(Bytes::from(self.body))
368            .map_err(|_| DecodeError::InvalidStatus)
369    }
370}
371
372pub fn encode_request<B: AsRef<[u8]>>(req: &Request<B>) -> Result<Vec<u8>, EncodeError> {
373    let packed = PackedRequest::from_request(req)?;
374    Ok(encode_message(&PackedMessage::Request(packed)))
375}
376
377pub fn encode_response<B: AsRef<[u8]>>(resp: &Response<B>) -> Result<Vec<u8>, EncodeError> {
378    let packed = PackedResponse::from_response(resp)?;
379    Ok(encode_message(&PackedMessage::Response(packed)))
380}
381
382pub fn encode_message(message: &PackedMessage) -> Vec<u8> {
383    let mut buf = Vec::new();
384    buf.extend_from_slice(&MAGIC);
385    buf.push(FORMAT_VERSION);
386
387    match message {
388        PackedMessage::Request(req) => {
389            buf.push(KIND_REQUEST);
390            buf.push(req.version.to_byte());
391            encode_request_fields(req, &mut buf);
392        }
393        PackedMessage::Response(resp) => {
394            buf.push(KIND_RESPONSE);
395            buf.push(resp.version.to_byte());
396            encode_response_fields(resp, &mut buf);
397        }
398    }
399
400    buf
401}
402
403pub fn decode(bytes: &[u8]) -> Result<PackedMessage, DecodeError> {
404    match decode_from_prefix(bytes)? {
405        Some((message, consumed)) => {
406            if consumed != bytes.len() {
407                return Err(DecodeError::TrailingBytes(bytes.len() - consumed));
408            }
409            Ok(message)
410        }
411        None => Err(DecodeError::Incomplete),
412    }
413}
414
415pub fn decode_request(bytes: &[u8]) -> Result<PackedRequest, DecodeError> {
416    match decode(bytes)? {
417        PackedMessage::Request(request) => Ok(request),
418        PackedMessage::Response(_) => Err(DecodeError::UnexpectedMessageKind),
419    }
420}
421
422pub fn decode_response(bytes: &[u8]) -> Result<PackedResponse, DecodeError> {
423    match decode(bytes)? {
424        PackedMessage::Response(response) => Ok(response),
425        PackedMessage::Request(_) => Err(DecodeError::UnexpectedMessageKind),
426    }
427}
428
429pub struct Decoder {
430    buf: BytesMut,
431}
432
433impl Decoder {
434    pub fn new() -> Self {
435        Self {
436            buf: BytesMut::new(),
437        }
438    }
439
440    pub fn push(&mut self, data: &[u8]) {
441        self.buf.extend_from_slice(data);
442    }
443
444    pub fn try_decode(&mut self) -> Result<Option<PackedMessage>, DecodeError> {
445        match decode_from_prefix(&self.buf)? {
446            Some((message, consumed)) => {
447                self.buf.advance(consumed);
448                Ok(Some(message))
449            }
450            None => Ok(None),
451        }
452    }
453
454    pub fn buffer_len(&self) -> usize {
455        self.buf.len()
456    }
457}
458
459pub mod packetizer {
460    use super::{decode, encode_message, EncodeError, PackedMessage};
461    use super::stream::{decode_frame, encode_frame, StreamDecodeError, StreamFrame};
462    use message_packetizer::SignableMessage;
463
464    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
465    pub struct HttpPackMessage {
466        pub payload: Vec<u8>,
467    }
468
469    impl HttpPackMessage {
470        pub fn from_message(message: &PackedMessage) -> Self {
471            Self {
472                payload: encode_message(message),
473            }
474        }
475
476        pub fn decode(&self) -> Result<PackedMessage, super::DecodeError> {
477            decode(&self.payload)
478        }
479
480        pub fn try_into_message(&self) -> Result<PackedMessage, super::DecodeError> {
481            decode(&self.payload)
482        }
483
484        pub fn from_request<B: AsRef<[u8]>>(
485            req: &http::Request<B>,
486        ) -> Result<Self, EncodeError> {
487            Ok(Self {
488                payload: super::encode_request(req)?,
489            })
490        }
491
492        pub fn from_response<B: AsRef<[u8]>>(
493            resp: &http::Response<B>,
494        ) -> Result<Self, EncodeError> {
495            Ok(Self {
496                payload: super::encode_response(resp)?,
497            })
498        }
499    }
500
501    impl SignableMessage for HttpPackMessage {}
502
503    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
504    pub struct HttpPackStreamMessage {
505        pub payload: Vec<u8>,
506    }
507
508    impl HttpPackStreamMessage {
509        pub fn from_frame(frame: &StreamFrame) -> Self {
510            Self {
511                payload: encode_frame(frame),
512            }
513        }
514
515        pub fn decode(&self) -> Result<StreamFrame, StreamDecodeError> {
516            decode_frame(&self.payload)
517        }
518
519        pub fn try_into_frame(&self) -> Result<StreamFrame, StreamDecodeError> {
520            decode_frame(&self.payload)
521        }
522    }
523
524    impl SignableMessage for HttpPackStreamMessage {}
525
526    #[cfg(any(feature = "body", feature = "h3"))]
527    pub mod stream {
528        use super::HttpPackStreamMessage;
529        use crate::stream::{self, StreamEncodeError};
530
531        #[cfg(feature = "body")]
532        pub async fn encode_request<B, F, E>(
533            req: http::Request<B>,
534            stream_id: u64,
535            mut emit: F,
536        ) -> Result<(), StreamEncodeError<E>>
537        where
538            B: http_body::Body + Unpin,
539            B::Data: bytes::Buf,
540            B::Error: std::error::Error + Send + Sync + 'static,
541            F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
542        {
543            stream::body::encode_request(req, stream_id, |frame| {
544                emit(HttpPackStreamMessage::from_frame(&frame))
545            })
546            .await
547        }
548
549        #[cfg(feature = "body")]
550        pub async fn encode_response<B, F, E>(
551            resp: http::Response<B>,
552            stream_id: u64,
553            mut emit: F,
554        ) -> Result<(), StreamEncodeError<E>>
555        where
556            B: http_body::Body + Unpin,
557            B::Data: bytes::Buf,
558            B::Error: std::error::Error + Send + Sync + 'static,
559            F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
560        {
561            stream::body::encode_response(resp, stream_id, |frame| {
562                emit(HttpPackStreamMessage::from_frame(&frame))
563            })
564            .await
565        }
566
567        #[cfg(feature = "h3")]
568        pub async fn encode_server_request<S, B, F, E>(
569            req: http::Request<()>,
570            stream_id: u64,
571            stream: &mut h3::server::RequestStream<S, B>,
572            mut emit: F,
573        ) -> Result<(), StreamEncodeError<E>>
574        where
575            S: h3::quic::RecvStream,
576            B: bytes::Buf,
577            F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
578        {
579            stream::h3::encode_server_request(req, stream_id, stream, |frame| {
580                emit(HttpPackStreamMessage::from_frame(&frame))
581            })
582            .await
583        }
584
585        #[cfg(feature = "h3")]
586        pub async fn encode_client_response<S, B, F, E>(
587            resp: http::Response<()>,
588            stream_id: u64,
589            stream: &mut h3::client::RequestStream<S, B>,
590            mut emit: F,
591        ) -> Result<(), StreamEncodeError<E>>
592        where
593            S: h3::quic::RecvStream,
594            B: bytes::Buf,
595            F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
596        {
597            stream::h3::encode_client_response(resp, stream_id, stream, |frame| {
598                emit(HttpPackStreamMessage::from_frame(&frame))
599            })
600            .await
601        }
602    }
603}
604
605fn collect_headers(headers: &HeaderMap) -> Vec<HeaderField> {
606    headers
607        .iter()
608        .map(|(name, value)| HeaderField {
609            name: name.as_str().as_bytes().to_vec(),
610            value: value.as_bytes().to_vec(),
611        })
612        .collect()
613}
614
615fn encode_request_fields(req: &PackedRequest, buf: &mut Vec<u8>) {
616    put_varint(buf, req.method.len() as u64);
617    buf.extend_from_slice(&req.method);
618
619    if let Some(scheme) = &req.scheme {
620        put_varint(buf, scheme.len() as u64);
621        buf.extend_from_slice(scheme);
622    } else {
623        put_varint(buf, 0);
624    }
625
626    if let Some(authority) = &req.authority {
627        put_varint(buf, authority.len() as u64);
628        buf.extend_from_slice(authority);
629    } else {
630        put_varint(buf, 0);
631    }
632
633    put_varint(buf, req.path.len() as u64);
634    buf.extend_from_slice(&req.path);
635
636    put_varint(buf, req.headers.len() as u64);
637    for header in &req.headers {
638        put_varint(buf, header.name.len() as u64);
639        buf.extend_from_slice(&header.name);
640        put_varint(buf, header.value.len() as u64);
641        buf.extend_from_slice(&header.value);
642    }
643
644    put_varint(buf, req.body.len() as u64);
645    buf.extend_from_slice(&req.body);
646}
647
648fn encode_response_fields(resp: &PackedResponse, buf: &mut Vec<u8>) {
649    buf.extend_from_slice(&resp.status.to_be_bytes());
650
651    put_varint(buf, resp.headers.len() as u64);
652    for header in &resp.headers {
653        put_varint(buf, header.name.len() as u64);
654        buf.extend_from_slice(&header.name);
655        put_varint(buf, header.value.len() as u64);
656        buf.extend_from_slice(&header.value);
657    }
658
659    put_varint(buf, resp.body.len() as u64);
660    buf.extend_from_slice(&resp.body);
661}
662
663fn decode_from_prefix(bytes: &[u8]) -> Result<Option<(PackedMessage, usize)>, DecodeError> {
664    let mut offset = 0usize;
665
666    if bytes.len() < MAGIC.len() {
667        return Ok(None);
668    }
669    if &bytes[..MAGIC.len()] != MAGIC {
670        return Err(DecodeError::InvalidMagic);
671    }
672    offset += MAGIC.len();
673
674    if bytes.len() < offset + 3 {
675        return Ok(None);
676    }
677    let format_version = bytes[offset];
678    offset += 1;
679    if format_version != FORMAT_VERSION {
680        return Err(DecodeError::UnsupportedFormatVersion(format_version));
681    }
682
683    let kind = bytes[offset];
684    offset += 1;
685    let http_version = HttpVersion::from_byte(bytes[offset])?;
686    offset += 1;
687
688    match kind {
689        KIND_REQUEST => {
690            let method = read_bytes(bytes, &mut offset)?;
691            let scheme = read_bytes(bytes, &mut offset)?;
692            let authority = read_bytes(bytes, &mut offset)?;
693            let path = read_bytes(bytes, &mut offset)?;
694
695            let method = match method {
696                Some(value) if !value.is_empty() => value,
697                Some(_) => return Err(DecodeError::InvalidMethod),
698                None => return Ok(None),
699            };
700
701            let scheme = match scheme {
702                Some(value) if value.is_empty() => None,
703                Some(value) => Some(value),
704                None => return Ok(None),
705            };
706
707            let authority = match authority {
708                Some(value) if value.is_empty() => None,
709                Some(value) => Some(value),
710                None => return Ok(None),
711            };
712
713            let path = match path {
714                Some(value) if value.is_empty() => b"/".to_vec(),
715                Some(value) => value,
716                None => return Ok(None),
717            };
718
719            validate_method(&method)?;
720            validate_path(&path)?;
721
722            let header_count = match read_varint(bytes, &mut offset)? {
723                Some(value) => value,
724                None => return Ok(None),
725            };
726            if header_count > MAX_HEADERS {
727                return Err(DecodeError::TooManyHeaders(header_count));
728            }
729            let mut headers = Vec::with_capacity(header_count as usize);
730            for _ in 0..header_count {
731                let name = match read_bytes(bytes, &mut offset)? {
732                    Some(value) => value,
733                    None => return Ok(None),
734                };
735                let value = match read_bytes(bytes, &mut offset)? {
736                    Some(value) => value,
737                    None => return Ok(None),
738                };
739                validate_header_name(&name)?;
740                validate_header_value(&value)?;
741                headers.push(HeaderField { name, value });
742            }
743
744            let body_len = match read_varint(bytes, &mut offset)? {
745                Some(value) => value,
746                None => return Ok(None),
747            };
748            let body = read_raw(bytes, &mut offset, body_len)?;
749            let body = match body {
750                Some(value) => value,
751                None => return Ok(None),
752            };
753
754            Ok(Some((
755                PackedMessage::Request(PackedRequest {
756                    version: http_version,
757                    method,
758                    scheme,
759                    authority,
760                    path,
761                    headers,
762                    body,
763                }),
764                offset,
765            )))
766        }
767        KIND_RESPONSE => {
768            if bytes.len() < offset + 2 {
769                return Ok(None);
770            }
771            let status = u16::from_be_bytes([bytes[offset], bytes[offset + 1]]);
772            offset += 2;
773            if StatusCode::from_u16(status).is_err() {
774                return Err(DecodeError::InvalidStatus);
775            }
776
777            let header_count = match read_varint(bytes, &mut offset)? {
778                Some(value) => value,
779                None => return Ok(None),
780            };
781            if header_count > MAX_HEADERS {
782                return Err(DecodeError::TooManyHeaders(header_count));
783            }
784            let mut headers = Vec::with_capacity(header_count as usize);
785            for _ in 0..header_count {
786                let name = match read_bytes(bytes, &mut offset)? {
787                    Some(value) => value,
788                    None => return Ok(None),
789                };
790                let value = match read_bytes(bytes, &mut offset)? {
791                    Some(value) => value,
792                    None => return Ok(None),
793                };
794                validate_header_name(&name)?;
795                validate_header_value(&value)?;
796                headers.push(HeaderField { name, value });
797            }
798
799            let body_len = match read_varint(bytes, &mut offset)? {
800                Some(value) => value,
801                None => return Ok(None),
802            };
803            let body = read_raw(bytes, &mut offset, body_len)?;
804            let body = match body {
805                Some(value) => value,
806                None => return Ok(None),
807            };
808
809            Ok(Some((
810                PackedMessage::Response(PackedResponse {
811                    version: http_version,
812                    status,
813                    headers,
814                    body,
815                }),
816                offset,
817            )))
818        }
819        other => Err(DecodeError::InvalidKind(other)),
820    }
821}
822
823fn put_varint(buf: &mut Vec<u8>, mut value: u64) {
824    while value >= 0x80 {
825        buf.push(((value as u8) & 0x7f) | 0x80);
826        value >>= 7;
827    }
828    buf.push(value as u8);
829}
830
831fn read_varint(bytes: &[u8], offset: &mut usize) -> Result<Option<u64>, DecodeError> {
832    let mut value: u64 = 0;
833    let mut shift = 0;
834
835    for _ in 0..10 {
836        if *offset >= bytes.len() {
837            return Ok(None);
838        }
839        let byte = bytes[*offset];
840        *offset += 1;
841        value |= ((byte & 0x7f) as u64) << shift;
842        if (byte & 0x80) == 0 {
843            return Ok(Some(value));
844        }
845        shift += 7;
846    }
847
848    Err(DecodeError::InvalidVarint)
849}
850
851fn read_bytes(bytes: &[u8], offset: &mut usize) -> Result<Option<Vec<u8>>, DecodeError> {
852    let len = match read_varint(bytes, offset)? {
853        Some(value) => value,
854        None => return Ok(None),
855    };
856    let data = read_raw(bytes, offset, len)?;
857    Ok(data)
858}
859
860fn read_raw(
861    bytes: &[u8],
862    offset: &mut usize,
863    len: u64,
864) -> Result<Option<Vec<u8>>, DecodeError> {
865    let len = usize::try_from(len).map_err(|_| DecodeError::LengthOverflow)?;
866    if bytes.len() < *offset + len {
867        return Ok(None);
868    }
869    let data = bytes[*offset..*offset + len].to_vec();
870    *offset += len;
871    Ok(Some(data))
872}
873
874fn validate_method(method: &[u8]) -> Result<(), DecodeError> {
875    Method::from_bytes(method).map_err(|_| DecodeError::InvalidMethod)?;
876    Ok(())
877}
878
879fn validate_path(path: &[u8]) -> Result<(), DecodeError> {
880    if path.is_empty() || has_crlf(path) {
881        return Err(DecodeError::InvalidPath);
882    }
883    Ok(())
884}
885
886fn validate_header_name(name: &[u8]) -> Result<(), DecodeError> {
887    HeaderName::from_bytes(name).map_err(|_| DecodeError::InvalidHeaderName)?;
888    Ok(())
889}
890
891fn validate_header_value(value: &[u8]) -> Result<(), DecodeError> {
892    HeaderValue::from_bytes(value).map_err(|_| DecodeError::InvalidHeaderValue)?;
893    Ok(())
894}
895
896fn validate_header_field(field: &HeaderField) -> Result<(), DecodeError> {
897    validate_header_name(&field.name)?;
898    validate_header_value(&field.value)?;
899    Ok(())
900}
901
902fn eq_ignore_ascii_case(left: &[u8], right: &[u8]) -> bool {
903    if left.len() != right.len() {
904        return false;
905    }
906    left.iter()
907        .zip(right.iter())
908        .all(|(a, b)| a.eq_ignore_ascii_case(b))
909}
910
911fn has_crlf(data: &[u8]) -> bool {
912    data.iter().any(|byte| *byte == b'\r' || *byte == b'\n')
913}
914
915#[cfg(test)]
916mod tests {
917    use super::*;
918
919    #[test]
920    fn request_roundtrip() {
921        let req = Request::builder()
922            .method("POST")
923            .uri("https://example.com/ingest?x=1")
924            .header("x-test", "value")
925            .body(Bytes::from_static(b"hello"))
926            .unwrap();
927
928        let encoded = encode_request(&req).unwrap();
929        let decoded = decode(&encoded).unwrap();
930
931        let packed = match decoded {
932            PackedMessage::Request(packed) => packed,
933            _ => panic!("expected request"),
934        };
935
936        assert_eq!(packed.method, b"POST".to_vec());
937        assert_eq!(packed.path, b"/ingest?x=1".to_vec());
938        assert_eq!(packed.body, b"hello".to_vec());
939
940        let http1 = packed.to_http1_bytes().unwrap();
941        let http1_str = String::from_utf8(http1).unwrap();
942        assert!(http1_str.starts_with("POST /ingest?x=1 HTTP/1.1\r\n"));
943        assert!(http1_str.contains("host: example.com\r\n"));
944        assert!(http1_str.contains("content-length: 5\r\n"));
945    }
946
947    #[test]
948    fn decoder_streaming() {
949        let req = Request::builder()
950            .method("GET")
951            .uri("/status")
952            .body(Bytes::from_static(b""))
953            .unwrap();
954
955        let encoded = encode_request(&req).unwrap();
956        let mid = encoded.len() / 2;
957
958        let mut decoder = Decoder::new();
959        decoder.push(&encoded[..mid]);
960        assert!(decoder.try_decode().unwrap().is_none());
961
962        decoder.push(&encoded[mid..]);
963        let message = decoder.try_decode().unwrap();
964        assert!(matches!(message, Some(PackedMessage::Request(_))));
965        assert_eq!(decoder.buffer_len(), 0);
966    }
967
968    #[test]
969    fn response_roundtrip() {
970        let resp = Response::builder()
971            .status(204)
972            .header("x-reply", "ok")
973            .body(Bytes::from_static(b""))
974            .unwrap();
975
976        let encoded = encode_response(&resp).unwrap();
977        let decoded = decode(&encoded).unwrap();
978
979        let packed = match decoded {
980            PackedMessage::Response(packed) => packed,
981            _ => panic!("expected response"),
982        };
983
984        assert_eq!(packed.status, 204);
985        let http1 = packed.to_http1_bytes().unwrap();
986        let http1_str = String::from_utf8(http1).unwrap();
987        assert!(http1_str.starts_with("HTTP/1.1 204 No Content\r\n"));
988    }
989}