use crate::bytes::{Buf, BytesCursor, BytesMut};
use crate::codec::Encoder;
use crate::http::body::{Body, Frame, HeaderMap, HeaderName, HeaderValue, SizeHint};
use crate::http::h1::codec::{
ChunkedBodyDecoder, HttpError, append_chunk_size_line, append_decimal, parse_header_line,
require_transfer_encoding_chunked, unique_header_value, validate_header_field,
};
use crate::http::h1::types::{Method, Request, Response, Version};
use crate::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use memchr::memmem;
use std::future::poll_fn;
use std::pin::Pin;
use std::task::Poll;
const DEFAULT_MAX_HEADERS_SIZE: usize = 64 * 1024;
const DEFAULT_MAX_BODY_SIZE: usize = 16 * 1024 * 1024;
const DEFAULT_MAX_TRAILERS_SIZE: usize = 64 * 1024;
const MAX_HEADERS: usize = 128;
pub struct Http1ClientCodec {
state: ClientDecodeState,
max_headers_size: usize,
max_body_size: usize,
max_trailers_size: usize,
}
enum ClientDecodeState {
Head,
Body {
version: Version,
status: u16,
reason: String,
headers: Vec<(String, String)>,
remaining: usize,
},
Chunked {
version: Version,
status: u16,
reason: String,
headers: Vec<(String, String)>,
chunked: ChunkedBodyDecoder,
},
Eof {
version: Version,
status: u16,
reason: String,
headers: Vec<(String, String)>,
},
Poisoned,
}
impl Http1ClientCodec {
#[must_use]
pub fn new() -> Self {
Self {
state: ClientDecodeState::Head,
max_headers_size: DEFAULT_MAX_HEADERS_SIZE,
max_body_size: DEFAULT_MAX_BODY_SIZE,
max_trailers_size: DEFAULT_MAX_TRAILERS_SIZE,
}
}
#[must_use]
pub fn max_headers_size(mut self, size: usize) -> Self {
self.max_headers_size = size;
self
}
#[must_use]
pub fn max_body_size(mut self, size: usize) -> Self {
self.max_body_size = size;
self
}
#[must_use]
pub fn max_trailers_size(mut self, size: usize) -> Self {
self.max_trailers_size = size;
self
}
}
impl Default for Http1ClientCodec {
fn default() -> Self {
Self::new()
}
}
fn find_headers_end(buf: &[u8]) -> Option<usize> {
memmem::find(buf, b"\r\n\r\n").map(|idx| idx + 4)
}
fn parse_status_line(line: &str) -> Result<(Version, u16, String), HttpError> {
let mut parts = line.splitn(3, ' ');
let ver = parts.next().ok_or(HttpError::BadRequestLine)?;
let code = parts.next().ok_or(HttpError::BadRequestLine)?;
let reason = parts.next().unwrap_or("").to_owned();
let version = Version::from_bytes(ver.as_bytes()).ok_or(HttpError::UnsupportedVersion)?;
let status: u16 = code.parse().map_err(|_| HttpError::BadRequestLine)?;
if !(100..=999).contains(&status) {
return Err(HttpError::BadRequestLine);
}
Ok((version, status, reason))
}
fn response_body_kind(
headers: &[(String, String)],
status: u16,
request_method: &Method,
max_body_size_limit: usize,
) -> Result<ClientBodyKind, HttpError> {
let no_body_status = (100..=199).contains(&status) || matches!(status, 204 | 304);
if no_body_status || *request_method == Method::Head {
return Ok(ClientBodyKind::Empty);
}
let te = unique_header_value(headers, "Transfer-Encoding")?;
let cl = unique_header_value(headers, "Content-Length")?;
if te.is_some() && cl.is_some() {
return Err(HttpError::AmbiguousBodyLength);
}
if let Some(te) = te {
require_transfer_encoding_chunked(te)?;
return Ok(ClientBodyKind::Chunked {
state: ChunkedReadState::SizeLine,
trailers: HeaderMap::new(),
trailers_bytes: 0,
});
}
if let Some(cl) = cl {
let content_length: u64 = cl.trim().parse().map_err(|_| HttpError::BadContentLength)?;
if content_length == 0 {
return Ok(ClientBodyKind::Empty);
}
let max_body_size = u64::try_from(max_body_size_limit).unwrap_or(u64::MAX);
if content_length > max_body_size {
return Err(HttpError::BodyTooLargeDetailed {
actual: content_length,
limit: max_body_size,
});
}
return Ok(ClientBodyKind::ContentLength {
remaining: content_length,
});
}
Ok(ClientBodyKind::Eof)
}
impl crate::codec::Decoder for Http1ClientCodec {
type Item = Response;
type Error = HttpError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Response>, HttpError> {
match self.decode_inner(src) {
Err(e) => {
self.state = ClientDecodeState::Poisoned;
Err(e)
}
Ok(v) => Ok(v),
}
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Response>, HttpError> {
match self.decode_inner_eof(src) {
Err(e) => {
self.state = ClientDecodeState::Poisoned;
Err(e)
}
Ok(v) => Ok(v),
}
}
}
impl Http1ClientCodec {
#[allow(clippy::too_many_lines)]
fn decode_inner(&mut self, src: &mut BytesMut) -> Result<Option<Response>, HttpError> {
loop {
match &mut self.state {
ClientDecodeState::Poisoned => {
return Err(HttpError::BadHeader); }
state @ ClientDecodeState::Head => {
let Some(end) = find_headers_end(src.as_ref()) else {
if src.len() > self.max_headers_size {
return Err(HttpError::HeadersTooLarge);
}
return Ok(None);
};
if end > self.max_headers_size {
return Err(HttpError::HeadersTooLarge);
}
let head_bytes = src.split_to(end);
let head_str = std::str::from_utf8(head_bytes.as_ref())
.map_err(|_| HttpError::BadRequestLine)?;
let mut lines = head_str.split("\r\n");
let status_line = lines.next().ok_or(HttpError::BadRequestLine)?;
let (version, status, reason) = parse_status_line(status_line)?;
let mut headers = Vec::new();
for line in lines {
if line.is_empty() {
break;
}
headers.push(parse_header_line(line)?);
if headers.len() > MAX_HEADERS {
return Err(HttpError::TooManyHeaders);
}
}
if matches!(status, 100..=199 | 204 | 304) {
*state = ClientDecodeState::Head;
return Ok(Some(Response {
version,
status,
reason,
headers,
body: Vec::new(),
trailers: Vec::new(),
}));
}
let te = unique_header_value(&headers, "Transfer-Encoding")?;
let cl = unique_header_value(&headers, "Content-Length")?;
if te.is_some() && cl.is_some() {
return Err(HttpError::AmbiguousBodyLength);
}
if let Some(te) = te {
require_transfer_encoding_chunked(te)?;
*state = ClientDecodeState::Chunked {
version,
status,
reason,
headers,
chunked: ChunkedBodyDecoder::new(
self.max_body_size,
self.max_trailers_size,
),
};
continue;
}
if let Some(cl) = cl {
let content_length: usize =
cl.trim().parse().map_err(|_| HttpError::BadContentLength)?;
if content_length == 0 {
*state = ClientDecodeState::Head;
return Ok(Some(Response {
version,
status,
reason,
headers,
body: Vec::new(),
trailers: Vec::new(),
}));
}
if content_length > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
*state = ClientDecodeState::Body {
version,
status,
reason,
headers,
remaining: content_length,
};
continue;
}
if src.len() > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
*state = ClientDecodeState::Eof {
version,
status,
reason,
headers,
};
return Ok(None);
}
ClientDecodeState::Body { remaining, .. } => {
let need = *remaining;
if src.len() < need {
return Ok(None);
}
let body_bytes = src.split_to(need);
let old = std::mem::replace(&mut self.state, ClientDecodeState::Head);
let ClientDecodeState::Body {
version,
status,
reason,
headers,
..
} = old
else {
unreachable!()
};
return Ok(Some(Response {
version,
status,
reason,
headers,
body: body_bytes.to_vec(),
trailers: Vec::new(),
}));
}
ClientDecodeState::Chunked { chunked, .. } => {
let Some((body, trailers)) = chunked.decode(src)? else {
return Ok(None);
};
let old = std::mem::replace(&mut self.state, ClientDecodeState::Head);
let ClientDecodeState::Chunked {
version,
status,
reason,
headers,
..
} = old
else {
unreachable!()
};
return Ok(Some(Response {
version,
status,
reason,
headers,
body,
trailers,
}));
}
ClientDecodeState::Eof { .. } => {
if src.len() > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
return Ok(None);
}
}
}
}
fn decode_inner_eof(&mut self, src: &mut BytesMut) -> Result<Option<Response>, HttpError> {
if matches!(&self.state, ClientDecodeState::Poisoned) {
return Err(HttpError::BadHeader);
}
if matches!(&self.state, ClientDecodeState::Eof { .. }) {
let old = std::mem::replace(&mut self.state, ClientDecodeState::Head);
let ClientDecodeState::Eof {
version,
status,
reason,
headers,
} = old
else {
unreachable!()
};
if src.len() > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
let body = src.split_to(src.len()).to_vec();
return Ok(Some(Response {
version,
status,
reason,
headers,
body,
trailers: Vec::new(),
}));
}
match crate::codec::Decoder::decode(self, src)? {
Some(frame) => Ok(Some(frame)),
None if src.is_empty() => Ok(None),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"incomplete frame at EOF",
)
.into()),
}
}
}
impl crate::codec::Encoder<Request> for Http1ClientCodec {
type Error = HttpError;
fn encode(&mut self, req: Request, dst: &mut BytesMut) -> Result<(), HttpError> {
if req.uri.contains('\r')
|| req.uri.contains('\n')
|| req.uri.contains(' ')
|| req.uri.contains('\t')
{
return Err(HttpError::BadRequestLine);
}
if req.method.as_str().contains('\r')
|| req.method.as_str().contains('\n')
|| req.method.as_str().contains(' ')
|| req.method.as_str().contains('\t')
{
return Err(HttpError::BadMethod);
}
let te = unique_header_value(&req.headers, "Transfer-Encoding")?;
let cl = unique_header_value(&req.headers, "Content-Length")?;
let chunked = match te {
Some(value) => {
require_transfer_encoding_chunked(value)?;
true
}
None => false,
};
if chunked && cl.is_some() {
return Err(HttpError::AmbiguousBodyLength);
}
if !chunked && !req.trailers.is_empty() {
return Err(HttpError::TrailersNotAllowed);
}
if !chunked {
if let Some(cl) = cl {
let declared: usize = cl.trim().parse().map_err(|_| HttpError::BadContentLength)?;
if declared != req.body.len() {
return Err(HttpError::BadContentLength);
}
}
}
let mut has_content_length = false;
for (name, value) in &req.headers {
validate_header_field(name, value)?;
if name.eq_ignore_ascii_case("content-length") {
has_content_length = true;
}
}
if chunked {
for (name, value) in &req.trailers {
validate_header_field(name, value)?;
}
}
let headers_bytes: usize = req.headers.iter().map(|(n, v)| n.len() + v.len() + 4).sum();
dst.reserve(64 + req.uri.len() + headers_bytes + req.body.len());
dst.extend_from_slice(req.method.as_str().as_bytes());
dst.extend_from_slice(b" ");
dst.extend_from_slice(req.uri.as_bytes());
dst.extend_from_slice(b" ");
dst.extend_from_slice(req.version.as_str().as_bytes());
dst.extend_from_slice(b"\r\n");
for (name, value) in &req.headers {
dst.extend_from_slice(name.as_bytes());
dst.extend_from_slice(b": ");
dst.extend_from_slice(value.as_bytes());
dst.extend_from_slice(b"\r\n");
}
if chunked {
dst.extend_from_slice(b"\r\n");
if !req.body.is_empty() {
append_chunk_size_line(dst, req.body.len());
dst.extend_from_slice(&req.body);
dst.extend_from_slice(b"\r\n");
}
dst.extend_from_slice(b"0\r\n");
for (name, value) in &req.trailers {
dst.extend_from_slice(name.as_bytes());
dst.extend_from_slice(b": ");
dst.extend_from_slice(value.as_bytes());
dst.extend_from_slice(b"\r\n");
}
dst.extend_from_slice(b"\r\n");
return Ok(());
}
if !has_content_length {
if !req.body.is_empty() {
dst.extend_from_slice(b"Content-Length: ");
append_decimal(dst, req.body.len());
dst.extend_from_slice(b"\r\n");
} else if req.method == Method::Post || req.method == Method::Put {
dst.extend_from_slice(b"Content-Length: 0\r\n");
}
}
dst.extend_from_slice(b"\r\n");
if !req.body.is_empty() {
dst.extend_from_slice(&req.body);
}
Ok(())
}
}
pub struct Http1Client;
impl Http1Client {
pub async fn request<T>(io: T, req: Request) -> Result<Response, HttpError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let (response, _io) = Self::request_with_io(io, req).await?;
Ok(response)
}
pub async fn request_with_io<T>(io: T, req: Request) -> Result<(Response, T), HttpError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
Self::request_with_io_and_max_body_size(io, req, DEFAULT_MAX_BODY_SIZE).await
}
pub async fn request_with_io_and_max_body_size<T>(
io: T,
req: Request,
max_body_size: usize,
) -> Result<(Response, T), HttpError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut streaming =
Self::request_streaming_with_max_body_size(io, req, max_body_size).await?;
let mut response = Response {
version: streaming.head.version,
status: streaming.head.status,
reason: streaming.head.reason,
headers: streaming.head.headers,
body: Vec::new(),
trailers: Vec::new(),
};
while let Some(frame) = poll_fn(|cx| Pin::new(&mut streaming.body).poll_frame(cx)).await {
match frame? {
Frame::Data(mut buf) => {
while buf.has_remaining() {
let chunk = buf.chunk();
response.body.extend_from_slice(chunk);
buf.advance(chunk.len());
}
}
Frame::Trailers(trailers) => {
for (name, value) in trailers.iter() {
let value = value.to_str().map_or_else(
|_| String::from_utf8_lossy(value.as_bytes()).into_owned(),
std::borrow::ToOwned::to_owned,
);
response.trailers.push((name.as_str().to_string(), value));
}
}
}
}
let (io, prefetched) = streaming.body.into_inner_with_buffer();
if !prefetched.is_empty() {
return Err(HttpError::PrefetchedDataRemaining(prefetched.len()));
}
Ok((response, io))
}
pub async fn request_streaming<T>(
io: T,
req: Request,
) -> Result<ClientStreamingResponse<T>, HttpError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
Self::request_streaming_with_max_body_size(io, req, DEFAULT_MAX_BODY_SIZE).await
}
pub async fn request_streaming_with_max_body_size<T>(
mut io: T,
req: Request,
max_body_size: usize,
) -> Result<ClientStreamingResponse<T>, HttpError>
where
T: AsyncRead + AsyncWrite + Unpin,
{
let request_method = req.method.clone();
let expect_continue = unique_header_value(&req.headers, "Expect")?
.is_some_and(|value| value.trim().eq_ignore_ascii_case("100-continue"));
let mut codec = Http1ClientCodec::new();
let mut write_buf = BytesMut::with_capacity(1024);
codec.encode(req, &mut write_buf)?;
let header_end = find_headers_end(write_buf.as_ref()).ok_or(HttpError::BadRequestLine)?;
let (head_bytes, body_bytes) = write_buf.as_ref().split_at(header_end);
let mut request_body_sent = !expect_continue || body_bytes.is_empty();
if expect_continue {
io.write_all(head_bytes).await?;
} else {
io.write_all(write_buf.as_ref()).await?;
}
io.flush().await?;
let mut read_buf = BytesMut::with_capacity(8192);
let mut scratch = [0u8; 8192];
loop {
if let Some(end) = find_headers_end(read_buf.as_ref()) {
if end > DEFAULT_MAX_HEADERS_SIZE {
return Err(HttpError::HeadersTooLarge);
}
let head_bytes = read_buf.split_to(end);
let head_str = std::str::from_utf8(head_bytes.as_ref())
.map_err(|_| HttpError::BadRequestLine)?;
let mut lines = head_str.split("\r\n");
let status_line = lines.next().ok_or(HttpError::BadRequestLine)?;
let (version, status, reason) = parse_status_line(status_line)?;
let mut headers = Vec::new();
for line in lines {
if line.is_empty() {
break;
}
headers.push(parse_header_line(line)?);
if headers.len() > MAX_HEADERS {
return Err(HttpError::TooManyHeaders);
}
}
if (100..=199).contains(&status) && status != 101 {
if status == 100 && !request_body_sent {
io.write_all(body_bytes).await?;
io.flush().await?;
request_body_sent = true;
}
continue;
}
let kind = response_body_kind(&headers, status, &request_method, max_body_size)?;
let head = crate::http::h1::stream::ResponseHead {
version,
status,
reason,
headers,
};
let body_buf = read_buf;
let body =
ClientIncomingBody::with_max_body_size(io, kind, body_buf, max_body_size);
return Ok(ClientStreamingResponse { head, body });
}
if read_buf.len() > DEFAULT_MAX_HEADERS_SIZE {
return Err(HttpError::HeadersTooLarge);
}
let n = poll_fn(|cx| {
let mut rb = ReadBuf::new(&mut scratch);
match Pin::new(&mut io).poll_read(cx, &mut rb) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(rb.filled().len())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
}
})
.await?;
if n == 0 {
return Err(HttpError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"connection closed before response headers",
)));
}
read_buf.extend_from_slice(&scratch[..n]);
}
}
}
#[derive(Debug)]
pub struct ClientStreamingResponse<T> {
pub head: crate::http::h1::stream::ResponseHead,
pub body: ClientIncomingBody<T>,
}
#[derive(Debug, Clone)]
enum ClientBodyKind {
Empty,
ContentLength {
remaining: u64,
},
Chunked {
state: ChunkedReadState,
trailers: HeaderMap,
trailers_bytes: usize,
},
Eof,
}
#[derive(Debug, Clone, Copy)]
enum ChunkedReadState {
SizeLine,
Data { remaining: usize },
DataCrlf,
Trailers,
Done,
}
#[derive(Debug)]
pub struct ClientIncomingBody<T> {
io: T,
buffer: BytesMut,
kind: ClientBodyKind,
done: bool,
received: u64,
size_hint: SizeHint,
max_chunk_size: usize,
max_body_size: u64,
max_trailers_size: usize,
max_buffered_bytes: usize,
}
impl<T> ClientIncomingBody<T> {
const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024;
const DEFAULT_MAX_TRAILERS_SIZE: usize = 16 * 1024;
const DEFAULT_MAX_BUFFERED_BYTES: usize = 256 * 1024;
fn with_max_body_size(
io: T,
kind: ClientBodyKind,
buffer: BytesMut,
max_body_size: usize,
) -> Self {
let size_hint = match &kind {
ClientBodyKind::Empty => SizeHint::with_exact(0),
ClientBodyKind::ContentLength { remaining } => SizeHint::with_exact(*remaining),
ClientBodyKind::Chunked { .. } | ClientBodyKind::Eof => SizeHint::default(),
};
Self {
io,
buffer,
done: matches!(kind, ClientBodyKind::Empty),
kind,
received: 0,
size_hint,
max_chunk_size: Self::DEFAULT_MAX_CHUNK_SIZE,
max_body_size: u64::try_from(max_body_size).unwrap_or(u64::MAX),
max_trailers_size: Self::DEFAULT_MAX_TRAILERS_SIZE,
max_buffered_bytes: Self::DEFAULT_MAX_BUFFERED_BYTES,
}
}
#[must_use]
pub fn into_inner_with_buffer(self) -> (T, BytesMut) {
(self.io, self.buffer)
}
#[must_use]
pub fn into_inner(self) -> T {
self.into_inner_with_buffer().0
}
fn try_decode_frame(&mut self) -> Result<Option<Frame<BytesCursor>>, HttpError> {
if self.done {
return Ok(None);
}
let mut kind = std::mem::replace(&mut self.kind, ClientBodyKind::Empty);
let result = match &mut kind {
ClientBodyKind::Empty => {
self.done = true;
Ok(None)
}
ClientBodyKind::ContentLength { remaining } => {
self.try_decode_content_length_frame(remaining)
}
ClientBodyKind::Eof => self.try_decode_eof_frame(),
ClientBodyKind::Chunked {
state,
trailers,
trailers_bytes,
} => self.try_decode_chunked_frame(state, trailers, trailers_bytes),
};
self.kind = kind;
result
}
fn try_decode_content_length_frame(
&mut self,
remaining: &mut u64,
) -> Result<Option<Frame<BytesCursor>>, HttpError> {
if *remaining == 0 {
self.done = true;
return Ok(None);
}
if self.buffer.is_empty() {
return Ok(None);
}
let max = usize::try_from(*remaining).unwrap_or(usize::MAX);
let to_yield = self.buffer.len().min(max).min(self.max_chunk_size);
let chunk = self.buffer.split_to(to_yield);
*remaining = remaining.saturating_sub(to_yield as u64);
self.received = self.received.saturating_add(to_yield as u64);
if self.received > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
if *remaining == 0 {
self.done = true;
}
Ok(Some(Frame::Data(BytesCursor::new(chunk.freeze()))))
}
fn try_decode_eof_frame(&mut self) -> Result<Option<Frame<BytesCursor>>, HttpError> {
if self.buffer.is_empty() {
return Ok(None);
}
let to_yield = self.buffer.len().min(self.max_chunk_size);
let chunk = self.buffer.split_to(to_yield);
self.received = self.received.saturating_add(to_yield as u64);
if self.received > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
Ok(Some(Frame::Data(BytesCursor::new(chunk.freeze()))))
}
#[allow(clippy::too_many_lines)]
fn try_decode_chunked_frame(
&mut self,
state: &mut ChunkedReadState,
trailers: &mut HeaderMap,
trailers_bytes: &mut usize,
) -> Result<Option<Frame<BytesCursor>>, HttpError> {
loop {
match *state {
ChunkedReadState::SizeLine => {
let line_end = self.buffer.as_ref().windows(2).position(|w| w == b"\r\n");
let Some(line_end) = line_end else {
return Ok(None);
};
let line = &self.buffer.as_ref()[..line_end];
let line_str =
std::str::from_utf8(line).map_err(|_| HttpError::BadChunkedEncoding)?;
let size_part = line_str.split(';').next().unwrap_or("");
if size_part.is_empty() {
return Err(HttpError::BadChunkedEncoding);
}
if size_part
.as_bytes()
.first()
.is_some_and(u8::is_ascii_whitespace)
|| size_part
.as_bytes()
.last()
.is_some_and(u8::is_ascii_whitespace)
{
return Err(HttpError::BadChunkedEncoding);
}
let chunk_size = usize::from_str_radix(size_part, 16)
.map_err(|_| HttpError::BadChunkedEncoding)?;
let _ = self.buffer.split_to(line_end + 2);
if chunk_size == 0 {
*state = ChunkedReadState::Trailers;
*trailers = HeaderMap::new();
*trailers_bytes = 0;
} else {
*state = ChunkedReadState::Data {
remaining: chunk_size,
};
}
}
ChunkedReadState::Data { remaining } => {
if self.buffer.is_empty() {
return Ok(None);
}
let to_yield = self.buffer.len().min(remaining).min(self.max_chunk_size);
let chunk = self.buffer.split_to(to_yield);
let next = remaining.saturating_sub(to_yield);
*state = if next == 0 {
ChunkedReadState::DataCrlf
} else {
ChunkedReadState::Data { remaining: next }
};
self.received = self.received.saturating_add(to_yield as u64);
if self.received > self.max_body_size {
return Err(HttpError::BodyTooLarge);
}
return Ok(Some(Frame::Data(BytesCursor::new(chunk.freeze()))));
}
ChunkedReadState::DataCrlf => {
if self.buffer.len() < 2 {
return Ok(None);
}
if self.buffer.as_ref()[0] != b'\r' || self.buffer.as_ref()[1] != b'\n' {
return Err(HttpError::BadChunkedEncoding);
}
let _ = self.buffer.split_to(2);
*state = ChunkedReadState::SizeLine;
}
ChunkedReadState::Trailers => {
let line_end = self.buffer.as_ref().windows(2).position(|w| w == b"\r\n");
let Some(line_end) = line_end else {
if *trailers_bytes + self.buffer.len() > self.max_trailers_size {
return Err(HttpError::HeadersTooLarge);
}
return Ok(None);
};
let line = self.buffer.split_to(line_end);
let _ = self.buffer.split_to(2);
if line.is_empty() {
self.done = true;
*state = ChunkedReadState::Done;
if !trailers.is_empty() {
return Ok(Some(Frame::Trailers(std::mem::take(trailers))));
}
return Ok(None);
}
*trailers_bytes = trailers_bytes.saturating_add(line.len() + 2);
if *trailers_bytes > self.max_trailers_size {
return Err(HttpError::HeadersTooLarge);
}
let line_str =
std::str::from_utf8(line.as_ref()).map_err(|_| HttpError::BadHeader)?;
let Some(colon) = line_str.find(':') else {
return Err(HttpError::BadHeader);
};
let name = line_str[..colon].trim();
let value = line_str[colon + 1..].trim();
validate_header_field(name, value)?;
trailers.append(
HeaderName::from_string(name),
HeaderValue::from_bytes(value.as_bytes()),
);
}
ChunkedReadState::Done => return Ok(None),
}
}
}
}
impl<T> Body for ClientIncomingBody<T>
where
T: AsyncRead + Unpin,
{
type Data = BytesCursor;
type Error = HttpError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
loop {
match self.try_decode_frame() {
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
Ok(None) => {}
Err(e) => {
self.done = true;
return Poll::Ready(Some(Err(e)));
}
}
if self.done {
return Poll::Ready(None);
}
let mut scratch = [0u8; 8192];
let mut rb = ReadBuf::new(&mut scratch);
match Pin::new(&mut self.io).poll_read(cx, &mut rb) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => {
let n = rb.filled().len();
if n == 0 {
match &self.kind {
ClientBodyKind::ContentLength { remaining } if *remaining != 0 => {
self.done = true;
return Poll::Ready(Some(Err(HttpError::BadContentLength)));
}
ClientBodyKind::Chunked { .. } => {
self.done = true;
return Poll::Ready(Some(Err(HttpError::BadChunkedEncoding)));
}
_ => {
self.done = true;
return Poll::Ready(None);
}
}
}
self.buffer.extend_from_slice(&scratch[..n]);
if self.buffer.len() > self.max_buffered_bytes {
self.done = true;
return Poll::Ready(Some(Err(HttpError::BodyTooLarge)));
}
}
Poll::Ready(Err(e)) => {
self.done = true;
return Poll::Ready(Some(Err(HttpError::Io(e))));
}
}
}
}
fn is_end_stream(&self) -> bool {
self.done
}
fn size_hint(&self) -> SizeHint {
self.size_hint
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bytes::{Buf, BytesMut};
use crate::codec::Decoder;
use std::pin::Pin;
use std::task::{Context, Waker};
#[test]
fn decode_simple_response() {
let mut codec = Http1ClientCodec::new();
let mut buf = BytesMut::from(&b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello"[..]);
let resp = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(resp.status, 200);
assert_eq!(resp.reason, "OK");
assert_eq!(resp.version, Version::Http11);
assert_eq!(resp.body, b"hello");
assert!(resp.trailers.is_empty());
}
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn block_on<F: std::future::Future>(f: F) -> F::Output {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut pinned = std::pin::pin!(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn poll_body<B: Body + Unpin>(body: &mut B) -> Option<Result<Frame<B::Data>, B::Error>> {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
loop {
match Pin::new(&mut *body).poll_frame(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[derive(Debug)]
struct TestIo {
read: std::io::Cursor<Vec<u8>>,
written: Vec<u8>,
}
impl TestIo {
fn new(read_bytes: &[u8]) -> Self {
Self {
read: std::io::Cursor::new(read_bytes.to_vec()),
written: Vec::new(),
}
}
}
impl AsyncRead for TestIo {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let dst = buf.unfilled();
let n = std::io::Read::read(&mut self.read, dst)?;
buf.advance(n);
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for TestIo {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
src: &[u8],
) -> Poll<std::io::Result<usize>> {
self.written.extend_from_slice(src);
Poll::Ready(Ok(src.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
struct ExpectContinueIo {
read: std::io::Cursor<Vec<u8>>,
writes: Vec<Vec<u8>>,
}
impl ExpectContinueIo {
fn new(read_bytes: &[u8]) -> Self {
Self {
read: std::io::Cursor::new(read_bytes.to_vec()),
writes: Vec::new(),
}
}
}
impl AsyncRead for ExpectContinueIo {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let dst = buf.unfilled();
let n = std::io::Read::read(&mut self.read, dst)?;
buf.advance(n);
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for ExpectContinueIo {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
src: &[u8],
) -> Poll<std::io::Result<usize>> {
self.writes.push(src.to_vec());
Poll::Ready(Ok(src.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
#[test]
fn request_streaming_content_length() {
let response_bytes = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let mut resp = block_on(Http1Client::request_streaming(io, req)).expect("streaming resp");
assert_eq!(resp.head.status, 200);
let mut collected = Vec::new();
while let Some(frame) = poll_body(&mut resp.body) {
let frame = frame.expect("frame ok");
match frame {
Frame::Data(mut buf) => {
while buf.has_remaining() {
let chunk = buf.chunk();
collected.extend_from_slice(chunk);
buf.advance(chunk.len());
}
}
Frame::Trailers(_) => {}
}
}
assert_eq!(collected, b"hello");
}
#[test]
fn request_streaming_chunked_with_trailers() {
let response_bytes = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n5\r\nhello\r\n0\r\nFoo: Bar\r\n\r\n";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let mut resp = block_on(Http1Client::request_streaming(io, req)).expect("streaming resp");
assert_eq!(resp.head.status, 200);
let mut data = Vec::new();
let mut saw_trailers = false;
while let Some(frame) = poll_body(&mut resp.body) {
let frame = frame.expect("frame ok");
match frame {
Frame::Data(mut buf) => {
while buf.has_remaining() {
let chunk = buf.chunk();
data.extend_from_slice(chunk);
buf.advance(chunk.len());
}
}
Frame::Trailers(trailers) => {
saw_trailers = true;
let foo = trailers.get(&HeaderName::from_static("foo")).unwrap();
assert_eq!(foo.as_bytes(), b"Bar");
}
}
}
assert_eq!(data, b"hello");
assert!(saw_trailers);
}
#[test]
fn request_head_response_ignores_content_length_body() {
let response_bytes = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\n";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Head,
uri: "/".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let resp = block_on(Http1Client::request(io, req)).expect("head response");
assert_eq!(resp.status, 200);
assert!(resp.body.is_empty());
assert!(resp.trailers.is_empty());
}
#[test]
fn request_streaming_skips_informational_response() {
let response_bytes =
b"HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Post,
uri: "/upload".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: b"data".to_vec(),
trailers: Vec::new(),
peer_addr: None,
};
let mut resp = block_on(Http1Client::request_streaming(io, req)).expect("streaming resp");
assert_eq!(resp.head.status, 200);
let mut collected = Vec::new();
while let Some(frame) = poll_body(&mut resp.body) {
let frame = frame.expect("frame ok");
if let Frame::Data(mut buf) = frame {
while buf.has_remaining() {
let chunk = buf.chunk();
collected.extend_from_slice(chunk);
buf.advance(chunk.len());
}
}
}
assert_eq!(collected, b"hello");
}
#[test]
fn request_streaming_expect_continue_sends_body_only_after_continue() {
let response_bytes =
b"HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok";
let io = ExpectContinueIo::new(response_bytes);
let req = Request {
method: Method::Post,
uri: "/upload".to_string(),
version: Version::Http11,
headers: vec![
("Host".to_string(), "example.com".to_string()),
("Expect".to_string(), "100-continue".to_string()),
],
body: b"hello".to_vec(),
trailers: Vec::new(),
peer_addr: None,
};
let resp = block_on(Http1Client::request_with_io(io, req)).expect("response");
assert_eq!(resp.0.status, 200);
assert_eq!(resp.0.body, b"ok");
let writes = resp.1.writes;
assert!(
writes.len() >= 2,
"expect-continue flow should split head and body writes"
);
let first_write = String::from_utf8(writes[0].clone()).expect("headers should be utf8");
assert!(first_write.contains("Expect: 100-continue\r\n"));
assert!(
first_write.ends_with("\r\n\r\n"),
"first write should contain only request head"
);
assert!(
!first_write.contains("hello"),
"request body must not be sent before 100 Continue"
);
let body_bytes = writes[1..].concat();
assert_eq!(body_bytes, b"hello");
}
#[test]
fn request_streaming_expect_continue_skips_body_on_early_final_response() {
let response_bytes = b"HTTP/1.1 417 Expectation Failed\r\nContent-Length: 0\r\n\r\n";
let io = ExpectContinueIo::new(response_bytes);
let req = Request {
method: Method::Post,
uri: "/upload".to_string(),
version: Version::Http11,
headers: vec![
("Host".to_string(), "example.com".to_string()),
("Expect".to_string(), "100-continue".to_string()),
],
body: b"hello".to_vec(),
trailers: Vec::new(),
peer_addr: None,
};
let resp = block_on(Http1Client::request_with_io(io, req)).expect("response");
assert_eq!(resp.0.status, 417);
let writes = resp.1.writes;
assert_eq!(
writes.len(),
1,
"early final response must suppress body upload"
);
let first_write = String::from_utf8(writes[0].clone()).expect("headers should be utf8");
assert!(first_write.contains("Expect: 100-continue\r\n"));
assert!(
first_write.ends_with("\r\n\r\n"),
"only the request head should be written"
);
assert!(
!first_write.contains("hello"),
"request body must not be sent after early final response"
);
}
#[test]
fn request_streaming_upgrade_preserves_prefetched_bytes() {
let response_bytes =
b"HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n\x81\x00";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/ws".to_string(),
version: Version::Http11,
headers: vec![
("Host".to_string(), "example.com".to_string()),
("Connection".to_string(), "Upgrade".to_string()),
("Upgrade".to_string(), "websocket".to_string()),
],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let resp = block_on(Http1Client::request_streaming(io, req)).expect("streaming resp");
assert_eq!(resp.head.status, 101);
let (mut io, prefetched) = resp.body.into_inner_with_buffer();
assert_eq!(prefetched.as_ref(), b"\x81\x00");
let mut tail = [0u8; 8];
let n = std::io::Read::read(&mut io.read, &mut tail).expect("cursor read");
assert_eq!(n, 0);
}
#[test]
fn request_skips_informational_response() {
let response_bytes = b"HTTP/1.1 103 Early Hints\r\nLink: </a.css>; rel=preload\r\n\r\nHTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let resp = block_on(Http1Client::request(io, req)).expect("response");
assert_eq!(resp.status, 200);
assert_eq!(resp.body, b"ok");
}
#[test]
fn request_with_io_returns_transport_for_reuse() {
let response_bytes = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/reuse".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let (resp, io) = block_on(Http1Client::request_with_io(io, req)).expect("response");
assert_eq!(resp.status, 200);
assert_eq!(resp.body, b"ok");
let request_bytes = String::from_utf8(io.written).expect("request write should be utf8");
assert!(request_bytes.starts_with("GET /reuse HTTP/1.1\r\n"));
assert!(request_bytes.contains("Host: example.com\r\n"));
}
#[test]
fn request_with_io_rejects_unread_prefetched_bytes() {
let response_bytes =
b"HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n\x81\x00";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/ws".to_string(),
version: Version::Http11,
headers: vec![
("Host".to_string(), "example.com".to_string()),
("Connection".to_string(), "Upgrade".to_string()),
("Upgrade".to_string(), "websocket".to_string()),
],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let err = block_on(Http1Client::request_with_io(io, req)).expect_err("must reject");
match err {
HttpError::PrefetchedDataRemaining(count) => assert_eq!(count, 2),
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn request_rejects_unread_prefetched_bytes() {
let response_bytes =
b"HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\n\x81\x00";
let io = TestIo::new(response_bytes);
let req = Request {
method: Method::Get,
uri: "/ws".to_string(),
version: Version::Http11,
headers: vec![
("Host".to_string(), "example.com".to_string()),
("Connection".to_string(), "Upgrade".to_string()),
("Upgrade".to_string(), "websocket".to_string()),
],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let err = block_on(Http1Client::request(io, req)).expect_err("must reject");
match err {
HttpError::PrefetchedDataRemaining(count) => assert_eq!(count, 2),
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn decode_response_no_body() {
let mut codec = Http1ClientCodec::new();
let mut buf = BytesMut::from(&b"HTTP/1.1 204 No Content\r\n\r\n"[..]);
let resp = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(resp.status, 204);
assert!(resp.body.is_empty());
assert!(resp.trailers.is_empty());
}
#[test]
fn decode_response_incomplete() {
let mut codec = Http1ClientCodec::new();
let mut buf = BytesMut::from(&b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nhel"[..]);
assert!(codec.decode(&mut buf).unwrap().is_none());
}
#[test]
fn encode_request() {
let mut codec = Http1ClientCodec::new();
let req = Request {
method: crate::http::h1::types::Method::Get,
uri: "/index.html".into(),
version: Version::Http11,
headers: vec![("Host".into(), "example.com".into())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let mut buf = BytesMut::with_capacity(256);
crate::codec::Encoder::encode(&mut codec, req, &mut buf).unwrap();
let s = String::from_utf8(buf.to_vec()).unwrap();
assert!(s.starts_with("GET /index.html HTTP/1.1\r\n"));
assert!(s.contains("Host: example.com\r\n"));
}
#[test]
fn encode_request_with_body() {
let mut codec = Http1ClientCodec::new();
let req = Request {
method: crate::http::h1::types::Method::Post,
uri: "/api".into(),
version: Version::Http11,
headers: vec![("Host".into(), "api.example.com".into())],
body: b"data".to_vec(),
trailers: Vec::new(),
peer_addr: None,
};
let mut buf = BytesMut::with_capacity(256);
crate::codec::Encoder::encode(&mut codec, req, &mut buf).unwrap();
let s = String::from_utf8(buf.to_vec()).unwrap();
assert!(s.contains("Content-Length: 4\r\n"));
assert!(s.ends_with("\r\n\r\ndata"));
}
#[test]
fn decode_chunked_response() {
let mut codec = Http1ClientCodec::new();
let raw = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n\
5\r\nhello\r\n6\r\n world\r\n0\r\n\r\n";
let mut buf = BytesMut::from(&raw[..]);
let resp = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(resp.status, 200);
assert_eq!(resp.body, b"hello world");
assert!(resp.trailers.is_empty());
}
#[test]
fn decode_chunked_response_with_trailers() {
let mut codec = Http1ClientCodec::new();
let raw = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n\
5\r\nhello\r\n0\r\nX-Trailer: one\r\nY-Trailer: two\r\n\r\n";
let mut buf = BytesMut::from(&raw[..]);
let resp = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(resp.body, b"hello");
assert_eq!(resp.trailers.len(), 2);
assert_eq!(resp.trailers[0].0, "X-Trailer");
assert_eq!(resp.trailers[0].1, "one");
}
#[test]
fn streaming_chunked_trailer_limit_does_not_count_terminal_crlf() {
let trailer_value = "a".repeat(16 * 1024 - 4);
let mut response_bytes = Vec::new();
response_bytes.extend_from_slice(b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n");
response_bytes.extend_from_slice(b"0\r\nX:");
response_bytes.extend_from_slice(trailer_value.as_bytes());
response_bytes.extend_from_slice(b"\r\n\r\n");
let io = TestIo::new(&response_bytes);
let req = Request {
method: Method::Get,
uri: "/".to_string(),
version: Version::Http11,
headers: vec![("Host".to_string(), "example.com".to_string())],
body: Vec::new(),
trailers: Vec::new(),
peer_addr: None,
};
let mut resp = block_on(Http1Client::request_streaming(io, req)).expect("streaming resp");
assert_eq!(resp.head.status, 200);
let mut saw_trailers = false;
while let Some(frame) = poll_body(&mut resp.body) {
let frame = frame.expect("frame ok");
match frame {
Frame::Data(_) => return, Frame::Trailers(trailers) => {
saw_trailers = true;
let header = trailers
.get(&HeaderName::from_static("x"))
.expect("trailer x");
assert_eq!(header.as_bytes().len(), trailer_value.len());
}
}
}
assert!(saw_trailers);
}
#[test]
fn decode_response_without_length_is_eof_delimited() {
let mut codec = Http1ClientCodec::new();
let raw = b"HTTP/1.1 200 OK\r\n\r\nhello";
let mut buf = BytesMut::from(&raw[..]);
assert!(codec.decode(&mut buf).unwrap().is_none());
let resp = codec.decode_eof(&mut buf).unwrap().unwrap();
assert_eq!(resp.body, b"hello");
}
#[test]
fn decode_headers_too_large() {
let mut codec = Http1ClientCodec::new().max_headers_size(32);
let mut buf = BytesMut::from(&b"HTTP/1.1 200 OK\r\nX-Large: aaaaaaaaaaaaaaa\r\n\r\n"[..]);
let result = codec.decode(&mut buf);
assert!(matches!(result, Err(HttpError::HeadersTooLarge)));
}
#[test]
fn decode_body_too_large_content_length() {
let mut codec = Http1ClientCodec::new().max_body_size(10);
let mut buf = BytesMut::from(&b"HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\n"[..]);
let result = codec.decode(&mut buf);
assert!(matches!(result, Err(HttpError::BodyTooLarge)));
}
#[test]
fn decode_body_too_large_chunked() {
let mut codec = Http1ClientCodec::new().max_body_size(10);
let raw = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n\
14\r\n01234567890123456789\r\n0\r\n\r\n";
let mut buf = BytesMut::from(&raw[..]);
let result = codec.decode(&mut buf);
assert!(matches!(result, Err(HttpError::BodyTooLarge)));
}
#[test]
fn decode_body_at_limit_succeeds() {
let mut codec = Http1ClientCodec::new().max_body_size(5);
let mut buf = BytesMut::from(&b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello"[..]);
let resp = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(resp.body, b"hello");
}
#[test]
fn reject_both_content_length_and_transfer_encoding() {
let mut codec = Http1ClientCodec::new();
let mut buf = BytesMut::from(
&b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\nTransfer-Encoding: chunked\r\n\r\n"[..],
);
let result = codec.decode(&mut buf);
assert!(matches!(result, Err(HttpError::AmbiguousBodyLength)));
}
#[test]
fn reject_invalid_crlf_after_chunk() {
let mut codec = Http1ClientCodec::new();
let raw = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n\
5\r\nhelloXX0\r\n\r\n";
let mut buf = BytesMut::from(&raw[..]);
let result = codec.decode(&mut buf);
assert!(matches!(result, Err(HttpError::BadChunkedEncoding)));
}
}