1use bytes::{Buf, Bytes, BytesMut};
2use http::{
3 HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode, Uri, Version,
4};
5
6const MAGIC: [u8; 4] = *b"HPK1";
7const FORMAT_VERSION: u8 = 1;
8const KIND_REQUEST: u8 = 1;
9const KIND_RESPONSE: u8 = 2;
10const MAX_HEADERS: u64 = 8192;
11
12#[cfg(feature = "h1")]
13pub mod h1;
14
15#[cfg(feature = "body")]
16pub mod body;
17
18#[cfg(feature = "h3")]
19pub mod h3;
20
21pub mod stream;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum HttpVersion {
25 Http11,
26 H2,
27 H3,
28}
29
30impl HttpVersion {
31 fn from_http(version: Version) -> Result<Self, EncodeError> {
32 match version {
33 Version::HTTP_11 => Ok(Self::Http11),
34 Version::HTTP_2 => Ok(Self::H2),
35 Version::HTTP_3 => Ok(Self::H3),
36 other => Err(EncodeError::UnsupportedHttpVersion(other)),
37 }
38 }
39
40 fn from_byte(byte: u8) -> Result<Self, DecodeError> {
41 match byte {
42 1 => Ok(Self::Http11),
43 2 => Ok(Self::H2),
44 3 => Ok(Self::H3),
45 other => Err(DecodeError::UnsupportedHttpVersion(other)),
46 }
47 }
48
49 fn to_byte(self) -> u8 {
50 match self {
51 Self::Http11 => 1,
52 Self::H2 => 2,
53 Self::H3 => 3,
54 }
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct HeaderField {
60 pub name: Vec<u8>,
61 pub value: Vec<u8>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct PackedRequest {
66 pub version: HttpVersion,
67 pub method: Vec<u8>,
68 pub scheme: Option<Vec<u8>>,
69 pub authority: Option<Vec<u8>>,
70 pub path: Vec<u8>,
71 pub headers: Vec<HeaderField>,
72 pub body: Vec<u8>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct PackedResponse {
77 pub version: HttpVersion,
78 pub status: u16,
79 pub headers: Vec<HeaderField>,
80 pub body: Vec<u8>,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum PackedMessage {
85 Request(PackedRequest),
86 Response(PackedResponse),
87}
88
89#[derive(Debug)]
90pub enum EncodeError {
91 UnsupportedHttpVersion(Version),
92}
93
94impl std::fmt::Display for EncodeError {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 EncodeError::UnsupportedHttpVersion(version) => {
98 write!(f, "unsupported http version: {:?}", version)
99 }
100 }
101 }
102}
103
104impl std::error::Error for EncodeError {}
105
106#[derive(Debug)]
107pub enum DecodeError {
108 Incomplete,
109 InvalidMagic,
110 UnsupportedFormatVersion(u8),
111 UnsupportedHttpVersion(u8),
112 InvalidKind(u8),
113 InvalidVarint,
114 LengthOverflow,
115 TooManyHeaders(u64),
116 InvalidMethod,
117 InvalidPath,
118 InvalidHeaderName,
119 InvalidHeaderValue,
120 InvalidStatus,
121 TrailingBytes(usize),
122 UnexpectedMessageKind,
123}
124
125impl std::fmt::Display for DecodeError {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 match self {
128 DecodeError::Incomplete => write!(f, "incomplete payload"),
129 DecodeError::InvalidMagic => write!(f, "invalid magic"),
130 DecodeError::UnsupportedFormatVersion(version) => {
131 write!(f, "unsupported format version: {}", version)
132 }
133 DecodeError::UnsupportedHttpVersion(version) => {
134 write!(f, "unsupported http version: {}", version)
135 }
136 DecodeError::InvalidKind(kind) => write!(f, "invalid message kind: {}", kind),
137 DecodeError::InvalidVarint => write!(f, "invalid varint"),
138 DecodeError::LengthOverflow => write!(f, "length overflow"),
139 DecodeError::TooManyHeaders(count) => write!(f, "too many headers: {}", count),
140 DecodeError::InvalidMethod => write!(f, "invalid method"),
141 DecodeError::InvalidPath => write!(f, "invalid path"),
142 DecodeError::InvalidHeaderName => write!(f, "invalid header name"),
143 DecodeError::InvalidHeaderValue => write!(f, "invalid header value"),
144 DecodeError::InvalidStatus => write!(f, "invalid status"),
145 DecodeError::TrailingBytes(remaining) => {
146 write!(f, "trailing bytes: {}", remaining)
147 }
148 DecodeError::UnexpectedMessageKind => write!(f, "unexpected message kind"),
149 }
150 }
151}
152
153impl std::error::Error for DecodeError {}
154
155impl PackedRequest {
156 pub fn from_request<B: AsRef<[u8]>>(req: &Request<B>) -> Result<Self, EncodeError> {
157 let version = HttpVersion::from_http(req.version())?;
158 let method = req.method().as_str().as_bytes().to_vec();
159
160 let uri = req.uri();
161 let scheme = uri.scheme_str().map(|s| s.as_bytes().to_vec());
162 let authority = uri
163 .authority()
164 .map(|a| a.as_str().as_bytes().to_vec())
165 .or_else(|| req.headers().get("host").map(|v| v.as_bytes().to_vec()));
166 let path = uri
167 .path_and_query()
168 .map(|pq| pq.as_str())
169 .unwrap_or("/");
170 let path = if path.is_empty() { "/" } else { path };
171 let headers = collect_headers(req.headers());
172 let body = req.body().as_ref().to_vec();
173
174 Ok(Self {
175 version,
176 method,
177 scheme,
178 authority,
179 path: path.as_bytes().to_vec(),
180 headers,
181 body,
182 })
183 }
184
185 pub fn to_http1_bytes(&self) -> Result<Vec<u8>, DecodeError> {
186 validate_method(&self.method)?;
187 validate_path(&self.path)?;
188
189 let mut out = Vec::new();
190 out.extend_from_slice(&self.method);
191 out.extend_from_slice(b" ");
192 out.extend_from_slice(&self.path);
193 out.extend_from_slice(b" HTTP/1.1\r\n");
194
195 let mut has_host = false;
196 let mut has_content_length = false;
197
198 for header in &self.headers {
199 if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
200 continue;
201 }
202 if eq_ignore_ascii_case(&header.name, b"host") {
203 has_host = true;
204 }
205 if eq_ignore_ascii_case(&header.name, b"content-length") {
206 has_content_length = true;
207 }
208 validate_header_field(header)?;
209 out.extend_from_slice(&header.name);
210 out.extend_from_slice(b": ");
211 out.extend_from_slice(&header.value);
212 out.extend_from_slice(b"\r\n");
213 }
214
215 if !has_host {
216 if let Some(authority) = &self.authority {
217 if has_crlf(authority) {
218 return Err(DecodeError::InvalidHeaderValue);
219 }
220 out.extend_from_slice(b"host: ");
221 out.extend_from_slice(authority);
222 out.extend_from_slice(b"\r\n");
223 }
224 }
225
226 if !has_content_length {
227 let len = self.body.len().to_string();
228 out.extend_from_slice(b"content-length: ");
229 out.extend_from_slice(len.as_bytes());
230 out.extend_from_slice(b"\r\n");
231 }
232
233 out.extend_from_slice(b"\r\n");
234 out.extend_from_slice(&self.body);
235 Ok(out)
236 }
237
238 pub fn into_http1_request(self) -> Result<Request<Bytes>, DecodeError> {
239 validate_method(&self.method)?;
240 let method = Method::from_bytes(&self.method).map_err(|_| DecodeError::InvalidMethod)?;
241 let path = if self.path.is_empty() {
242 b"/".as_slice()
243 } else {
244 self.path.as_slice()
245 };
246 let path_str = std::str::from_utf8(path).map_err(|_| DecodeError::InvalidPath)?;
247 let uri = path_str.parse::<Uri>().map_err(|_| DecodeError::InvalidPath)?;
248
249 let mut builder = Request::builder().method(method).uri(uri).version(Version::HTTP_11);
250 let mut has_host = false;
251 let mut has_content_length = false;
252 for header in &self.headers {
253 if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
254 continue;
255 }
256 let name = HeaderName::from_bytes(&header.name)
257 .map_err(|_| DecodeError::InvalidHeaderName)?;
258 let value = HeaderValue::from_bytes(&header.value)
259 .map_err(|_| DecodeError::InvalidHeaderValue)?;
260 if eq_ignore_ascii_case(&header.name, b"host") {
261 has_host = true;
262 }
263 if eq_ignore_ascii_case(&header.name, b"content-length") {
264 has_content_length = true;
265 }
266 builder = builder.header(name, value);
267 }
268
269 if !has_host {
270 if let Some(authority) = &self.authority {
271 let value = HeaderValue::from_bytes(authority)
272 .map_err(|_| DecodeError::InvalidHeaderValue)?;
273 builder = builder.header("host", value);
274 }
275 }
276
277 if !has_content_length {
278 let len = self.body.len().to_string();
279 builder = builder.header("content-length", len);
280 }
281
282 builder
283 .body(Bytes::from(self.body))
284 .map_err(|_| DecodeError::InvalidPath)
285 }
286}
287
288impl PackedResponse {
289 pub fn from_response<B: AsRef<[u8]>>(resp: &Response<B>) -> Result<Self, EncodeError> {
290 let version = HttpVersion::from_http(resp.version())?;
291 let status = resp.status().as_u16();
292 let headers = collect_headers(resp.headers());
293 let body = resp.body().as_ref().to_vec();
294
295 Ok(Self {
296 version,
297 status,
298 headers,
299 body,
300 })
301 }
302
303 pub fn to_http1_bytes(&self) -> Result<Vec<u8>, DecodeError> {
304 let status = StatusCode::from_u16(self.status).map_err(|_| DecodeError::InvalidStatus)?;
305 let reason = status.canonical_reason().unwrap_or("");
306
307 let mut out = Vec::new();
308 out.extend_from_slice(b"HTTP/1.1 ");
309 out.extend_from_slice(status.as_str().as_bytes());
310 if !reason.is_empty() {
311 out.extend_from_slice(b" ");
312 out.extend_from_slice(reason.as_bytes());
313 }
314 out.extend_from_slice(b"\r\n");
315
316 let mut has_content_length = false;
317 for header in &self.headers {
318 if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
319 continue;
320 }
321 if eq_ignore_ascii_case(&header.name, b"content-length") {
322 has_content_length = true;
323 }
324 validate_header_field(header)?;
325 out.extend_from_slice(&header.name);
326 out.extend_from_slice(b": ");
327 out.extend_from_slice(&header.value);
328 out.extend_from_slice(b"\r\n");
329 }
330
331 if !has_content_length {
332 let len = self.body.len().to_string();
333 out.extend_from_slice(b"content-length: ");
334 out.extend_from_slice(len.as_bytes());
335 out.extend_from_slice(b"\r\n");
336 }
337
338 out.extend_from_slice(b"\r\n");
339 out.extend_from_slice(&self.body);
340 Ok(out)
341 }
342
343 pub fn into_http1_response(self) -> Result<Response<Bytes>, DecodeError> {
344 let status = StatusCode::from_u16(self.status).map_err(|_| DecodeError::InvalidStatus)?;
345 let mut builder = Response::builder().status(status).version(Version::HTTP_11);
346 let mut has_content_length = false;
347 for header in &self.headers {
348 if eq_ignore_ascii_case(&header.name, b"transfer-encoding") {
349 continue;
350 }
351 let name = HeaderName::from_bytes(&header.name)
352 .map_err(|_| DecodeError::InvalidHeaderName)?;
353 let value = HeaderValue::from_bytes(&header.value)
354 .map_err(|_| DecodeError::InvalidHeaderValue)?;
355 if eq_ignore_ascii_case(&header.name, b"content-length") {
356 has_content_length = true;
357 }
358 builder = builder.header(name, value);
359 }
360
361 if !has_content_length {
362 let len = self.body.len().to_string();
363 builder = builder.header("content-length", len);
364 }
365
366 builder
367 .body(Bytes::from(self.body))
368 .map_err(|_| DecodeError::InvalidStatus)
369 }
370}
371
372pub fn encode_request<B: AsRef<[u8]>>(req: &Request<B>) -> Result<Vec<u8>, EncodeError> {
373 let packed = PackedRequest::from_request(req)?;
374 Ok(encode_message(&PackedMessage::Request(packed)))
375}
376
377pub fn encode_response<B: AsRef<[u8]>>(resp: &Response<B>) -> Result<Vec<u8>, EncodeError> {
378 let packed = PackedResponse::from_response(resp)?;
379 Ok(encode_message(&PackedMessage::Response(packed)))
380}
381
382pub fn encode_message(message: &PackedMessage) -> Vec<u8> {
383 let mut buf = Vec::new();
384 buf.extend_from_slice(&MAGIC);
385 buf.push(FORMAT_VERSION);
386
387 match message {
388 PackedMessage::Request(req) => {
389 buf.push(KIND_REQUEST);
390 buf.push(req.version.to_byte());
391 encode_request_fields(req, &mut buf);
392 }
393 PackedMessage::Response(resp) => {
394 buf.push(KIND_RESPONSE);
395 buf.push(resp.version.to_byte());
396 encode_response_fields(resp, &mut buf);
397 }
398 }
399
400 buf
401}
402
403pub fn decode(bytes: &[u8]) -> Result<PackedMessage, DecodeError> {
404 match decode_from_prefix(bytes)? {
405 Some((message, consumed)) => {
406 if consumed != bytes.len() {
407 return Err(DecodeError::TrailingBytes(bytes.len() - consumed));
408 }
409 Ok(message)
410 }
411 None => Err(DecodeError::Incomplete),
412 }
413}
414
415pub fn decode_request(bytes: &[u8]) -> Result<PackedRequest, DecodeError> {
416 match decode(bytes)? {
417 PackedMessage::Request(request) => Ok(request),
418 PackedMessage::Response(_) => Err(DecodeError::UnexpectedMessageKind),
419 }
420}
421
422pub fn decode_response(bytes: &[u8]) -> Result<PackedResponse, DecodeError> {
423 match decode(bytes)? {
424 PackedMessage::Response(response) => Ok(response),
425 PackedMessage::Request(_) => Err(DecodeError::UnexpectedMessageKind),
426 }
427}
428
429pub struct Decoder {
430 buf: BytesMut,
431}
432
433impl Decoder {
434 pub fn new() -> Self {
435 Self {
436 buf: BytesMut::new(),
437 }
438 }
439
440 pub fn push(&mut self, data: &[u8]) {
441 self.buf.extend_from_slice(data);
442 }
443
444 pub fn try_decode(&mut self) -> Result<Option<PackedMessage>, DecodeError> {
445 match decode_from_prefix(&self.buf)? {
446 Some((message, consumed)) => {
447 self.buf.advance(consumed);
448 Ok(Some(message))
449 }
450 None => Ok(None),
451 }
452 }
453
454 pub fn buffer_len(&self) -> usize {
455 self.buf.len()
456 }
457}
458
459pub mod packetizer {
460 use super::{decode, encode_message, EncodeError, PackedMessage};
461 use super::stream::{decode_frame, encode_frame, StreamDecodeError, StreamFrame};
462 use message_packetizer::SignableMessage;
463
464 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
465 pub struct HttpPackMessage {
466 pub payload: Vec<u8>,
467 }
468
469 impl HttpPackMessage {
470 pub fn from_message(message: &PackedMessage) -> Self {
471 Self {
472 payload: encode_message(message),
473 }
474 }
475
476 pub fn decode(&self) -> Result<PackedMessage, super::DecodeError> {
477 decode(&self.payload)
478 }
479
480 pub fn try_into_message(&self) -> Result<PackedMessage, super::DecodeError> {
481 decode(&self.payload)
482 }
483
484 pub fn from_request<B: AsRef<[u8]>>(
485 req: &http::Request<B>,
486 ) -> Result<Self, EncodeError> {
487 Ok(Self {
488 payload: super::encode_request(req)?,
489 })
490 }
491
492 pub fn from_response<B: AsRef<[u8]>>(
493 resp: &http::Response<B>,
494 ) -> Result<Self, EncodeError> {
495 Ok(Self {
496 payload: super::encode_response(resp)?,
497 })
498 }
499 }
500
501 impl SignableMessage for HttpPackMessage {}
502
503 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
504 pub struct HttpPackStreamMessage {
505 pub payload: Vec<u8>,
506 }
507
508 impl HttpPackStreamMessage {
509 pub fn from_frame(frame: &StreamFrame) -> Self {
510 Self {
511 payload: encode_frame(frame),
512 }
513 }
514
515 pub fn decode(&self) -> Result<StreamFrame, StreamDecodeError> {
516 decode_frame(&self.payload)
517 }
518
519 pub fn try_into_frame(&self) -> Result<StreamFrame, StreamDecodeError> {
520 decode_frame(&self.payload)
521 }
522 }
523
524 impl SignableMessage for HttpPackStreamMessage {}
525
526 #[cfg(any(feature = "body", feature = "h3"))]
527 pub mod stream {
528 use super::HttpPackStreamMessage;
529 use crate::stream::{self, StreamEncodeError};
530
531 #[cfg(feature = "body")]
532 pub async fn encode_request<B, F, E>(
533 req: http::Request<B>,
534 stream_id: u64,
535 mut emit: F,
536 ) -> Result<(), StreamEncodeError<E>>
537 where
538 B: http_body::Body + Unpin,
539 B::Data: bytes::Buf,
540 B::Error: std::error::Error + Send + Sync + 'static,
541 F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
542 {
543 stream::body::encode_request(req, stream_id, |frame| {
544 emit(HttpPackStreamMessage::from_frame(&frame))
545 })
546 .await
547 }
548
549 #[cfg(feature = "body")]
550 pub async fn encode_response<B, F, E>(
551 resp: http::Response<B>,
552 stream_id: u64,
553 mut emit: F,
554 ) -> Result<(), StreamEncodeError<E>>
555 where
556 B: http_body::Body + Unpin,
557 B::Data: bytes::Buf,
558 B::Error: std::error::Error + Send + Sync + 'static,
559 F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
560 {
561 stream::body::encode_response(resp, stream_id, |frame| {
562 emit(HttpPackStreamMessage::from_frame(&frame))
563 })
564 .await
565 }
566
567 #[cfg(feature = "h3")]
568 pub async fn encode_server_request<S, B, F, E>(
569 req: http::Request<()>,
570 stream_id: u64,
571 stream: &mut h3::server::RequestStream<S, B>,
572 mut emit: F,
573 ) -> Result<(), StreamEncodeError<E>>
574 where
575 S: h3::quic::RecvStream,
576 B: bytes::Buf,
577 F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
578 {
579 stream::h3::encode_server_request(req, stream_id, stream, |frame| {
580 emit(HttpPackStreamMessage::from_frame(&frame))
581 })
582 .await
583 }
584
585 #[cfg(feature = "h3")]
586 pub async fn encode_client_response<S, B, F, E>(
587 resp: http::Response<()>,
588 stream_id: u64,
589 stream: &mut h3::client::RequestStream<S, B>,
590 mut emit: F,
591 ) -> Result<(), StreamEncodeError<E>>
592 where
593 S: h3::quic::RecvStream,
594 B: bytes::Buf,
595 F: FnMut(HttpPackStreamMessage) -> Result<(), E>,
596 {
597 stream::h3::encode_client_response(resp, stream_id, stream, |frame| {
598 emit(HttpPackStreamMessage::from_frame(&frame))
599 })
600 .await
601 }
602 }
603}
604
605fn collect_headers(headers: &HeaderMap) -> Vec<HeaderField> {
606 headers
607 .iter()
608 .map(|(name, value)| HeaderField {
609 name: name.as_str().as_bytes().to_vec(),
610 value: value.as_bytes().to_vec(),
611 })
612 .collect()
613}
614
615fn encode_request_fields(req: &PackedRequest, buf: &mut Vec<u8>) {
616 put_varint(buf, req.method.len() as u64);
617 buf.extend_from_slice(&req.method);
618
619 if let Some(scheme) = &req.scheme {
620 put_varint(buf, scheme.len() as u64);
621 buf.extend_from_slice(scheme);
622 } else {
623 put_varint(buf, 0);
624 }
625
626 if let Some(authority) = &req.authority {
627 put_varint(buf, authority.len() as u64);
628 buf.extend_from_slice(authority);
629 } else {
630 put_varint(buf, 0);
631 }
632
633 put_varint(buf, req.path.len() as u64);
634 buf.extend_from_slice(&req.path);
635
636 put_varint(buf, req.headers.len() as u64);
637 for header in &req.headers {
638 put_varint(buf, header.name.len() as u64);
639 buf.extend_from_slice(&header.name);
640 put_varint(buf, header.value.len() as u64);
641 buf.extend_from_slice(&header.value);
642 }
643
644 put_varint(buf, req.body.len() as u64);
645 buf.extend_from_slice(&req.body);
646}
647
648fn encode_response_fields(resp: &PackedResponse, buf: &mut Vec<u8>) {
649 buf.extend_from_slice(&resp.status.to_be_bytes());
650
651 put_varint(buf, resp.headers.len() as u64);
652 for header in &resp.headers {
653 put_varint(buf, header.name.len() as u64);
654 buf.extend_from_slice(&header.name);
655 put_varint(buf, header.value.len() as u64);
656 buf.extend_from_slice(&header.value);
657 }
658
659 put_varint(buf, resp.body.len() as u64);
660 buf.extend_from_slice(&resp.body);
661}
662
663fn decode_from_prefix(bytes: &[u8]) -> Result<Option<(PackedMessage, usize)>, DecodeError> {
664 let mut offset = 0usize;
665
666 if bytes.len() < MAGIC.len() {
667 return Ok(None);
668 }
669 if &bytes[..MAGIC.len()] != MAGIC {
670 return Err(DecodeError::InvalidMagic);
671 }
672 offset += MAGIC.len();
673
674 if bytes.len() < offset + 3 {
675 return Ok(None);
676 }
677 let format_version = bytes[offset];
678 offset += 1;
679 if format_version != FORMAT_VERSION {
680 return Err(DecodeError::UnsupportedFormatVersion(format_version));
681 }
682
683 let kind = bytes[offset];
684 offset += 1;
685 let http_version = HttpVersion::from_byte(bytes[offset])?;
686 offset += 1;
687
688 match kind {
689 KIND_REQUEST => {
690 let method = read_bytes(bytes, &mut offset)?;
691 let scheme = read_bytes(bytes, &mut offset)?;
692 let authority = read_bytes(bytes, &mut offset)?;
693 let path = read_bytes(bytes, &mut offset)?;
694
695 let method = match method {
696 Some(value) if !value.is_empty() => value,
697 Some(_) => return Err(DecodeError::InvalidMethod),
698 None => return Ok(None),
699 };
700
701 let scheme = match scheme {
702 Some(value) if value.is_empty() => None,
703 Some(value) => Some(value),
704 None => return Ok(None),
705 };
706
707 let authority = match authority {
708 Some(value) if value.is_empty() => None,
709 Some(value) => Some(value),
710 None => return Ok(None),
711 };
712
713 let path = match path {
714 Some(value) if value.is_empty() => b"/".to_vec(),
715 Some(value) => value,
716 None => return Ok(None),
717 };
718
719 validate_method(&method)?;
720 validate_path(&path)?;
721
722 let header_count = match read_varint(bytes, &mut offset)? {
723 Some(value) => value,
724 None => return Ok(None),
725 };
726 if header_count > MAX_HEADERS {
727 return Err(DecodeError::TooManyHeaders(header_count));
728 }
729 let mut headers = Vec::with_capacity(header_count as usize);
730 for _ in 0..header_count {
731 let name = match read_bytes(bytes, &mut offset)? {
732 Some(value) => value,
733 None => return Ok(None),
734 };
735 let value = match read_bytes(bytes, &mut offset)? {
736 Some(value) => value,
737 None => return Ok(None),
738 };
739 validate_header_name(&name)?;
740 validate_header_value(&value)?;
741 headers.push(HeaderField { name, value });
742 }
743
744 let body_len = match read_varint(bytes, &mut offset)? {
745 Some(value) => value,
746 None => return Ok(None),
747 };
748 let body = read_raw(bytes, &mut offset, body_len)?;
749 let body = match body {
750 Some(value) => value,
751 None => return Ok(None),
752 };
753
754 Ok(Some((
755 PackedMessage::Request(PackedRequest {
756 version: http_version,
757 method,
758 scheme,
759 authority,
760 path,
761 headers,
762 body,
763 }),
764 offset,
765 )))
766 }
767 KIND_RESPONSE => {
768 if bytes.len() < offset + 2 {
769 return Ok(None);
770 }
771 let status = u16::from_be_bytes([bytes[offset], bytes[offset + 1]]);
772 offset += 2;
773 if StatusCode::from_u16(status).is_err() {
774 return Err(DecodeError::InvalidStatus);
775 }
776
777 let header_count = match read_varint(bytes, &mut offset)? {
778 Some(value) => value,
779 None => return Ok(None),
780 };
781 if header_count > MAX_HEADERS {
782 return Err(DecodeError::TooManyHeaders(header_count));
783 }
784 let mut headers = Vec::with_capacity(header_count as usize);
785 for _ in 0..header_count {
786 let name = match read_bytes(bytes, &mut offset)? {
787 Some(value) => value,
788 None => return Ok(None),
789 };
790 let value = match read_bytes(bytes, &mut offset)? {
791 Some(value) => value,
792 None => return Ok(None),
793 };
794 validate_header_name(&name)?;
795 validate_header_value(&value)?;
796 headers.push(HeaderField { name, value });
797 }
798
799 let body_len = match read_varint(bytes, &mut offset)? {
800 Some(value) => value,
801 None => return Ok(None),
802 };
803 let body = read_raw(bytes, &mut offset, body_len)?;
804 let body = match body {
805 Some(value) => value,
806 None => return Ok(None),
807 };
808
809 Ok(Some((
810 PackedMessage::Response(PackedResponse {
811 version: http_version,
812 status,
813 headers,
814 body,
815 }),
816 offset,
817 )))
818 }
819 other => Err(DecodeError::InvalidKind(other)),
820 }
821}
822
823fn put_varint(buf: &mut Vec<u8>, mut value: u64) {
824 while value >= 0x80 {
825 buf.push(((value as u8) & 0x7f) | 0x80);
826 value >>= 7;
827 }
828 buf.push(value as u8);
829}
830
831fn read_varint(bytes: &[u8], offset: &mut usize) -> Result<Option<u64>, DecodeError> {
832 let mut value: u64 = 0;
833 let mut shift = 0;
834
835 for _ in 0..10 {
836 if *offset >= bytes.len() {
837 return Ok(None);
838 }
839 let byte = bytes[*offset];
840 *offset += 1;
841 value |= ((byte & 0x7f) as u64) << shift;
842 if (byte & 0x80) == 0 {
843 return Ok(Some(value));
844 }
845 shift += 7;
846 }
847
848 Err(DecodeError::InvalidVarint)
849}
850
851fn read_bytes(bytes: &[u8], offset: &mut usize) -> Result<Option<Vec<u8>>, DecodeError> {
852 let len = match read_varint(bytes, offset)? {
853 Some(value) => value,
854 None => return Ok(None),
855 };
856 let data = read_raw(bytes, offset, len)?;
857 Ok(data)
858}
859
860fn read_raw(
861 bytes: &[u8],
862 offset: &mut usize,
863 len: u64,
864) -> Result<Option<Vec<u8>>, DecodeError> {
865 let len = usize::try_from(len).map_err(|_| DecodeError::LengthOverflow)?;
866 if bytes.len() < *offset + len {
867 return Ok(None);
868 }
869 let data = bytes[*offset..*offset + len].to_vec();
870 *offset += len;
871 Ok(Some(data))
872}
873
874fn validate_method(method: &[u8]) -> Result<(), DecodeError> {
875 Method::from_bytes(method).map_err(|_| DecodeError::InvalidMethod)?;
876 Ok(())
877}
878
879fn validate_path(path: &[u8]) -> Result<(), DecodeError> {
880 if path.is_empty() || has_crlf(path) {
881 return Err(DecodeError::InvalidPath);
882 }
883 Ok(())
884}
885
886fn validate_header_name(name: &[u8]) -> Result<(), DecodeError> {
887 HeaderName::from_bytes(name).map_err(|_| DecodeError::InvalidHeaderName)?;
888 Ok(())
889}
890
891fn validate_header_value(value: &[u8]) -> Result<(), DecodeError> {
892 HeaderValue::from_bytes(value).map_err(|_| DecodeError::InvalidHeaderValue)?;
893 Ok(())
894}
895
896fn validate_header_field(field: &HeaderField) -> Result<(), DecodeError> {
897 validate_header_name(&field.name)?;
898 validate_header_value(&field.value)?;
899 Ok(())
900}
901
902fn eq_ignore_ascii_case(left: &[u8], right: &[u8]) -> bool {
903 if left.len() != right.len() {
904 return false;
905 }
906 left.iter()
907 .zip(right.iter())
908 .all(|(a, b)| a.eq_ignore_ascii_case(b))
909}
910
911fn has_crlf(data: &[u8]) -> bool {
912 data.iter().any(|byte| *byte == b'\r' || *byte == b'\n')
913}
914
915#[cfg(test)]
916mod tests {
917 use super::*;
918
919 #[test]
920 fn request_roundtrip() {
921 let req = Request::builder()
922 .method("POST")
923 .uri("https://example.com/ingest?x=1")
924 .header("x-test", "value")
925 .body(Bytes::from_static(b"hello"))
926 .unwrap();
927
928 let encoded = encode_request(&req).unwrap();
929 let decoded = decode(&encoded).unwrap();
930
931 let packed = match decoded {
932 PackedMessage::Request(packed) => packed,
933 _ => panic!("expected request"),
934 };
935
936 assert_eq!(packed.method, b"POST".to_vec());
937 assert_eq!(packed.path, b"/ingest?x=1".to_vec());
938 assert_eq!(packed.body, b"hello".to_vec());
939
940 let http1 = packed.to_http1_bytes().unwrap();
941 let http1_str = String::from_utf8(http1).unwrap();
942 assert!(http1_str.starts_with("POST /ingest?x=1 HTTP/1.1\r\n"));
943 assert!(http1_str.contains("host: example.com\r\n"));
944 assert!(http1_str.contains("content-length: 5\r\n"));
945 }
946
947 #[test]
948 fn decoder_streaming() {
949 let req = Request::builder()
950 .method("GET")
951 .uri("/status")
952 .body(Bytes::from_static(b""))
953 .unwrap();
954
955 let encoded = encode_request(&req).unwrap();
956 let mid = encoded.len() / 2;
957
958 let mut decoder = Decoder::new();
959 decoder.push(&encoded[..mid]);
960 assert!(decoder.try_decode().unwrap().is_none());
961
962 decoder.push(&encoded[mid..]);
963 let message = decoder.try_decode().unwrap();
964 assert!(matches!(message, Some(PackedMessage::Request(_))));
965 assert_eq!(decoder.buffer_len(), 0);
966 }
967
968 #[test]
969 fn response_roundtrip() {
970 let resp = Response::builder()
971 .status(204)
972 .header("x-reply", "ok")
973 .body(Bytes::from_static(b""))
974 .unwrap();
975
976 let encoded = encode_response(&resp).unwrap();
977 let decoded = decode(&encoded).unwrap();
978
979 let packed = match decoded {
980 PackedMessage::Response(packed) => packed,
981 _ => panic!("expected response"),
982 };
983
984 assert_eq!(packed.status, 204);
985 let http1 = packed.to_http1_bytes().unwrap();
986 let http1_str = String::from_utf8(http1).unwrap();
987 assert!(http1_str.starts_with("HTTP/1.1 204 No Content\r\n"));
988 }
989}