use bytes::{Buf, BufMut, Bytes, BytesMut};
use http::{Method, Uri};
use http_body::{Body as HttpBody, Frame, SizeHint};
use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, ReadBuf};
use tokio::time::{Instant, Sleep};
use crate::error::{Error, Result};
use crate::headers::Headers;
use crate::request::{RequestBody, RequestBodyStream};
use crate::response::Response;
use crate::transport::connector::MaybeHttpsStream;
pub type H1ReuseHook = Box<dyn FnOnce(MaybeHttpsStream) + Send>;
pub struct H1StreamingOptions {
pub on_reusable: H1ReuseHook,
pub read_idle_timeout: Option<Duration>,
pub total_timeout: Option<Duration>,
pub request_head_sent: bool,
}
const MAX_HEADERS_SIZE: usize = 64 * 1024;
const INITIAL_HEADERS_CAPACITY: usize = 1024;
const MAX_HEADERS_COUNT: usize = 100;
const STREAM_READ_BUF_SIZE: usize = 64 * 1024;
const CHUNKED_COALESCE_COPY_LIMIT: usize = 64 * 1024;
pub struct H1Connection {
stream: MaybeHttpsStream,
should_close: bool,
chunked_write_scratch: BytesMut,
}
pub(crate) enum H1BodyMode {
Empty,
Fixed { remaining: usize, buffer: BytesMut },
Chunked { buffer: BytesMut },
CloseDelimited { buffer: BytesMut },
}
#[derive(Clone, Copy)]
pub(crate) enum H1RequestBodyKind {
None,
ContentLength(u64),
Chunked,
}
pub struct H1Body {
stream: Option<MaybeHttpsStream>,
mode: H1BodyMode,
should_close: bool,
on_reusable: Option<H1ReuseHook>,
read_buf: BytesMut,
terminal: bool,
read_idle_timeout: Option<Duration>,
read_idle_sleep: Option<Pin<Box<Sleep>>>,
total_timeout: Option<Duration>,
total_sleep: Option<Pin<Box<Sleep>>>,
}
impl H1Body {
fn new(
stream: MaybeHttpsStream,
mode: H1BodyMode,
should_close: bool,
on_reusable: H1ReuseHook,
read_idle_timeout: Option<Duration>,
total_timeout: Option<Duration>,
) -> Self {
Self {
stream: Some(stream),
mode,
should_close,
on_reusable: Some(on_reusable),
read_buf: BytesMut::with_capacity(STREAM_READ_BUF_SIZE),
terminal: false,
read_idle_timeout,
read_idle_sleep: None,
total_timeout,
total_sleep: total_timeout.map(|duration| Box::pin(tokio::time::sleep(duration))),
}
}
pub(crate) fn is_terminal(&self) -> bool {
self.terminal
}
fn size_hint(&self) -> SizeHint {
match &self.mode {
H1BodyMode::Empty => SizeHint::with_exact(0),
H1BodyMode::Fixed { remaining, buffer } => {
SizeHint::with_exact((*remaining + buffer.len()) as u64)
}
H1BodyMode::Chunked { .. } | H1BodyMode::CloseDelimited { .. } => SizeHint::default(),
}
}
fn poll_return_to_pool(&mut self, _cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Bytes>>>> {
if !self.should_close {
if let (Some(stream), Some(on_reusable)) = (self.stream.take(), self.on_reusable.take())
{
on_reusable(stream);
}
}
self.stream = None;
self.on_reusable = None;
self.terminal = true;
Poll::Ready(None)
}
fn fail(&mut self, err: Error) -> Poll<Option<Result<Frame<Bytes>>>> {
self.stream = None;
self.on_reusable = None;
self.terminal = true;
Poll::Ready(Some(Err(err)))
}
fn reset_read_idle(&mut self) {
self.read_idle_sleep = None;
}
#[inline]
fn timeouts_enabled(&self) -> bool {
self.total_sleep.is_some() || self.read_idle_timeout.is_some()
}
#[inline]
fn poll_timeouts(&mut self, cx: &mut Context<'_>) -> Option<Error> {
if let Some(total) = self.total_sleep.as_mut() {
if total.as_mut().poll(cx).is_ready() {
return Some(Error::TotalTimeout(self.total_timeout.unwrap_or_else(
|| total.deadline().saturating_duration_since(Instant::now()),
)));
}
}
if let Some(read_idle) = self.read_idle_timeout {
let sleep = self
.read_idle_sleep
.get_or_insert_with(|| Box::pin(tokio::time::sleep(read_idle)));
if sleep.as_mut().poll(cx).is_ready() {
return Some(Error::ReadIdleTimeout(read_idle));
}
}
None
}
#[inline]
fn poll_read_into_internal_buffer(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
self.poll_read_into_internal_buffer_limited(cx, STREAM_READ_BUF_SIZE)
}
#[inline]
fn poll_read_into_internal_buffer_limited(
&mut self,
cx: &mut Context<'_>,
limit: usize,
) -> Poll<Result<usize>> {
let Some(stream) = self.stream.as_mut() else {
return Poll::Ready(Err(Error::HttpProtocol(
"H1 response body stream is no longer available".into(),
)));
};
let limit = limit.clamp(1, STREAM_READ_BUF_SIZE);
if self.read_buf.capacity() - self.read_buf.len() < limit {
self.read_buf.reserve(limit);
}
let n = {
let dst = self.read_buf.chunk_mut();
let dst_slice: &mut [MaybeUninit<u8>] =
unsafe { std::slice::from_raw_parts_mut(dst.as_mut_ptr().cast(), dst.len()) };
let take = dst_slice.len().min(limit);
let mut read = ReadBuf::uninit(&mut dst_slice[..take]);
match Pin::new(stream).poll_read(cx, &mut read) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(Error::HttpProtocol(format!(
"Failed to read H1 response body: {}",
err
))));
}
Poll::Ready(Ok(())) => read.filled().len(),
}
};
unsafe {
self.read_buf.advance_mut(n);
}
if n > 0 {
self.reset_read_idle();
}
Poll::Ready(Ok(n))
}
#[inline]
fn poll_read_more(&mut self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
self.poll_read_into_internal_buffer(cx)
}
#[inline]
fn poll_fixed_body(
&mut self,
cx: &mut Context<'_>,
mut remaining: usize,
mut buffer: BytesMut,
) -> Poll<Option<Result<Frame<Bytes>>>> {
if remaining == 0 {
self.mode = H1BodyMode::Empty;
return self.poll_return_to_pool(cx);
}
if !buffer.is_empty() {
let n = remaining.min(buffer.len());
let chunk = buffer.split_to(n).freeze();
remaining -= n;
self.mode = H1BodyMode::Fixed { remaining, buffer };
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
if !self.read_buf.is_empty() {
let n = remaining.min(self.read_buf.len());
let chunk = self.read_buf.split_to(n).freeze();
remaining -= n;
self.mode = H1BodyMode::Fixed { remaining, buffer };
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
match self.poll_read_into_internal_buffer_limited(cx, remaining) {
Poll::Pending => {
self.mode = H1BodyMode::Fixed { remaining, buffer };
Poll::Pending
}
Poll::Ready(Ok(0)) => self.fail(Error::HttpProtocol(format!(
"Connection closed before receiving full body ({} bytes remaining)",
remaining
))),
Poll::Ready(Ok(n)) => {
let take = remaining.min(n);
let chunk = self.read_buf.split_to(take).freeze();
remaining -= take;
self.mode = H1BodyMode::Fixed { remaining, buffer };
Poll::Ready(Some(Ok(Frame::data(chunk))))
}
Poll::Ready(Err(err)) => self.fail(err),
}
}
#[inline]
fn poll_close_delimited(
&mut self,
cx: &mut Context<'_>,
mut buffer: BytesMut,
) -> Poll<Option<Result<Frame<Bytes>>>> {
if !buffer.is_empty() {
let chunk = buffer.split_to(buffer.len()).freeze();
self.mode = H1BodyMode::CloseDelimited { buffer };
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
if !self.read_buf.is_empty() {
let take = self.read_buf.len();
let chunk = self.read_buf.split_to(take).freeze();
self.mode = H1BodyMode::CloseDelimited { buffer };
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
match self.poll_read_into_internal_buffer(cx) {
Poll::Pending => {
self.mode = H1BodyMode::CloseDelimited { buffer };
Poll::Pending
}
Poll::Ready(Ok(0)) => {
self.should_close = true;
self.mode = H1BodyMode::Empty;
self.poll_return_to_pool(cx)
}
Poll::Ready(Ok(n)) => {
let chunk = self.read_buf.split_to(n).freeze();
self.mode = H1BodyMode::CloseDelimited { buffer };
Poll::Ready(Some(Ok(Frame::data(chunk))))
}
Poll::Ready(Err(err)) => self.fail(err),
}
}
#[inline]
fn drain_read_buf_into(&mut self, buffer: &mut BytesMut) {
if !self.read_buf.is_empty() {
buffer.unsplit(self.read_buf.split());
}
}
fn poll_consume_trailers(
&mut self,
cx: &mut Context<'_>,
buffer: &mut BytesMut,
) -> Poll<Result<()>> {
loop {
if let Some(pos) = find_crlf(buffer) {
if pos == 0 {
buffer.advance(2);
return Poll::Ready(Ok(()));
}
buffer.advance(pos + 2);
continue;
}
match self.poll_read_more(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(0)) => return Poll::Ready(Ok(())),
Poll::Ready(Ok(_)) => self.drain_read_buf_into(buffer),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
}
}
#[inline]
fn poll_chunked_body(
&mut self,
cx: &mut Context<'_>,
mut buffer: BytesMut,
) -> Poll<Option<Result<Frame<Bytes>>>> {
self.drain_read_buf_into(&mut buffer);
let (chunk_size, line_end) = loop {
if let Some((size, end)) = find_chunk_size(&buffer) {
break (size, end);
}
match self.poll_read_more(cx) {
Poll::Pending => {
self.mode = H1BodyMode::Chunked { buffer };
return Poll::Pending;
}
Poll::Ready(Ok(0)) => {
return self.fail(Error::HttpProtocol(
"Connection closed while reading chunk size".into(),
));
}
Poll::Ready(Ok(_)) => self.drain_read_buf_into(&mut buffer),
Poll::Ready(Err(err)) => return self.fail(err),
}
};
buffer.advance(line_end);
if chunk_size == 0 {
match self.poll_consume_trailers(cx, &mut buffer) {
Poll::Pending => {
self.mode = H1BodyMode::Chunked { buffer };
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
self.mode = H1BodyMode::Empty;
return self.poll_return_to_pool(cx);
}
Poll::Ready(Err(err)) => return self.fail(err),
}
}
let chunk_end = chunk_size + 2;
while buffer.len() < chunk_end {
match self.poll_read_more(cx) {
Poll::Pending => {
self.mode = H1BodyMode::Chunked { buffer };
return Poll::Pending;
}
Poll::Ready(Ok(0)) => {
return self.fail(Error::HttpProtocol(
"Connection closed while reading chunk data".into(),
));
}
Poll::Ready(Ok(_)) => self.drain_read_buf_into(&mut buffer),
Poll::Ready(Err(err)) => return self.fail(err),
}
}
if &buffer[chunk_size..chunk_end] != b"\r\n" {
return self.fail(Error::HttpProtocol(
"Malformed chunk: missing trailing CRLF".into(),
));
}
let chunk = buffer.split_to(chunk_size).freeze();
buffer.advance(2);
self.mode = H1BodyMode::Chunked { buffer };
Poll::Ready(Some(Ok(Frame::data(chunk))))
}
}
impl HttpBody for H1Body {
type Data = Bytes;
type Error = Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>>>> {
let this = &mut *self;
if this.terminal {
return Poll::Ready(None);
}
if this.timeouts_enabled() {
if let Some(err) = this.poll_timeouts(cx) {
return this.fail(err);
}
}
match std::mem::replace(&mut this.mode, H1BodyMode::Empty) {
H1BodyMode::Empty => this.poll_return_to_pool(cx),
H1BodyMode::Fixed { remaining, buffer } => this.poll_fixed_body(cx, remaining, buffer),
H1BodyMode::Chunked { buffer } => this.poll_chunked_body(cx, buffer),
H1BodyMode::CloseDelimited { buffer } => this.poll_close_delimited(cx, buffer),
}
}
fn is_end_stream(&self) -> bool {
self.terminal
}
fn size_hint(&self) -> SizeHint {
self.size_hint()
}
}
pub(crate) fn h1_request_body_kind(body: &RequestBody) -> H1RequestBodyKind {
match body {
RequestBody::Empty => H1RequestBodyKind::None,
RequestBody::Bytes(bytes) => H1RequestBodyKind::ContentLength(bytes.len() as u64),
RequestBody::Text(text) => H1RequestBodyKind::ContentLength(text.len() as u64),
RequestBody::Json(bytes) => H1RequestBodyKind::ContentLength(bytes.len() as u64),
RequestBody::Form(text) => H1RequestBodyKind::ContentLength(text.len() as u64),
RequestBody::Stream {
content_length: Some(len),
..
} => H1RequestBodyKind::ContentLength(*len),
RequestBody::Stream {
content_length: None,
..
} => H1RequestBodyKind::Chunked,
}
}
impl H1Connection {
pub fn new(stream: MaybeHttpsStream) -> Self {
Self {
stream,
should_close: false,
chunked_write_scratch: BytesMut::with_capacity(256),
}
}
pub fn into_inner(self) -> MaybeHttpsStream {
self.stream
}
pub fn should_close(&self) -> bool {
self.should_close
}
pub async fn send_request(
&mut self,
method: Method,
uri: &Uri,
headers: impl Into<Headers>,
body: Option<Bytes>,
) -> Result<Response> {
let headers = headers.into();
let request_bytes = self.build_request(
&method,
uri,
&headers,
body.as_ref()
.map(|bytes| H1RequestBodyKind::ContentLength(bytes.len() as u64))
.unwrap_or(H1RequestBodyKind::None),
)?;
self.stream
.write_all(&request_bytes)
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to write request: {}", e)))?;
if let Some(body) = body {
self.stream
.write_all(&body)
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to write body: {}", e)))?;
}
self.stream
.flush()
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to flush: {}", e)))?;
self.read_response(&method).await
}
pub async fn send_request_streaming(
mut self,
method: Method,
uri: &Uri,
headers: &Headers,
body: RequestBody,
options: H1StreamingOptions,
) -> Result<Response> {
let body_kind = h1_request_body_kind(&body);
let request_bytes = Self::build_request_bytes(&method, uri, headers, body_kind)?;
if !options.request_head_sent {
match body {
RequestBody::Stream {
stream,
content_length: Some(expected_len),
} => {
self.write_sized_request_stream_with_head(request_bytes, stream, expected_len)
.await?;
}
body => {
self.stream.write_all(&request_bytes).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to write request: {}", e))
})?;
self.write_request_body(body).await?;
}
}
} else {
self.write_request_body(body).await?;
}
self.stream
.flush()
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to flush: {}", e)))?;
let (response, mode) = self.read_streaming_response_headers(&method).await?;
let should_close = self.should_close;
let body = crate::response::Body::from_h1(H1Body::new(
self.stream,
mode,
should_close,
options.on_reusable,
options.read_idle_timeout,
options.total_timeout,
));
let (status, headers, http_version) = response.into_status_headers_version();
Ok(Response::with_body(status, headers, body, http_version))
}
pub(crate) fn build_request_bytes(
method: &Method,
uri: &Uri,
headers: &Headers,
body_kind: H1RequestBodyKind,
) -> Result<Vec<u8>> {
Self::build_request_impl(method, uri, headers, body_kind)
}
fn build_request(
&self,
method: &Method,
uri: &Uri,
headers: &Headers,
body_kind: H1RequestBodyKind,
) -> Result<Vec<u8>> {
Self::build_request_impl(method, uri, headers, body_kind)
}
fn build_request_impl(
method: &Method,
uri: &Uri,
headers: &Headers,
body_kind: H1RequestBodyKind,
) -> Result<Vec<u8>> {
let mut request = Vec::with_capacity(1024);
for (name, value) in headers.iter() {
validate_header_name(name)?;
validate_header_value(value)?;
}
request.extend_from_slice(method.as_str().as_bytes());
request.push(b' ');
if method == Method::CONNECT {
let host = uri
.host()
.ok_or_else(|| Error::HttpProtocol("CONNECT requires host".into()))?;
request.extend_from_slice(host.as_bytes());
request.push(b':');
let port = uri.port_u16().unwrap_or(443);
request.extend_from_slice(port.to_string().as_bytes());
} else if method == Method::OPTIONS && uri.path() == "*" {
request.push(b'*');
} else {
let path = crate::transport::origin_form_path(uri);
request.extend_from_slice(path.as_bytes());
}
request.extend_from_slice(b" HTTP/1.1\r\n");
request.extend_from_slice(b"Host: ");
if let Some(host) = uri.host() {
request.extend_from_slice(host.as_bytes());
if let Some(port) = uri.port() {
request.push(b':');
request.extend_from_slice(port.as_str().as_bytes());
}
}
request.extend_from_slice(b"\r\n");
let user_has_transfer_encoding = headers
.iter()
.any(|(name, _)| name.eq_ignore_ascii_case("transfer-encoding"));
let user_has_content_length = headers
.iter()
.any(|(name, _)| name.eq_ignore_ascii_case("content-length"));
let mut has_connection_header = false;
for (name, value) in headers.iter() {
if name.eq_ignore_ascii_case("host") {
continue;
}
if name.eq_ignore_ascii_case("connection") {
has_connection_header = true;
}
if matches!(body_kind, H1RequestBodyKind::Chunked)
&& name.eq_ignore_ascii_case("content-length")
{
continue;
}
if matches!(body_kind, H1RequestBodyKind::ContentLength(_))
&& name.eq_ignore_ascii_case("transfer-encoding")
{
continue;
}
request.extend_from_slice(name.as_bytes());
request.extend_from_slice(b": ");
request.extend_from_slice(value.as_bytes());
request.extend_from_slice(b"\r\n");
}
if !has_connection_header {
request.extend_from_slice(b"Connection: keep-alive\r\n");
}
match body_kind {
H1RequestBodyKind::None => {}
H1RequestBodyKind::ContentLength(len) => {
if !user_has_content_length {
request.extend_from_slice(b"Content-Length: ");
request.extend_from_slice(len.to_string().as_bytes());
request.extend_from_slice(b"\r\n");
}
}
H1RequestBodyKind::Chunked => {
if !user_has_transfer_encoding {
request.extend_from_slice(b"Transfer-Encoding: chunked\r\n");
}
}
}
request.extend_from_slice(b"\r\n");
Ok(request)
}
async fn write_request_body(&mut self, body: RequestBody) -> Result<()> {
match body {
RequestBody::Empty => Ok(()),
RequestBody::Bytes(bytes) => self.write_sized_request_bytes(bytes, None).await,
RequestBody::Text(text) => {
self.write_sized_request_bytes(Bytes::from(text.into_bytes()), None)
.await
}
RequestBody::Json(bytes) => {
self.write_sized_request_bytes(Bytes::from(bytes), None)
.await
}
RequestBody::Form(text) => {
self.write_sized_request_bytes(Bytes::from(text.into_bytes()), None)
.await
}
RequestBody::Stream {
mut stream,
content_length,
} => {
if let Some(expected_len) = content_length {
let mut sent = 0u64;
while let Some(chunk) =
std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await
{
let chunk = chunk?;
if chunk.is_empty() {
continue;
}
let next_sent = sent + chunk.len() as u64;
if next_sent > expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent more than Content-Length {}",
expected_len
)));
}
self.stream.write_all(&chunk).await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write sized streaming request body: {}",
e
))
})?;
sent = next_sent;
}
if sent != expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
sent, expected_len
)));
}
Ok(())
} else {
while let Some(chunk) =
std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await
{
let chunk = chunk?;
if chunk.is_empty() {
continue;
}
self.write_chunked_body_frame(&chunk).await?;
}
self.stream.write_all(b"0\r\n\r\n").await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write final chunked request body marker: {}",
e
))
})
}
}
}
}
async fn write_chunked_body_frame(&mut self, chunk: &Bytes) -> Result<()> {
if chunk.is_empty() {
return Ok(());
}
let prefix = format!("{:x}\r\n", chunk.len());
let prefix_bytes = prefix.as_bytes();
if chunk.len() <= CHUNKED_COALESCE_COPY_LIMIT {
self.chunked_write_scratch.clear();
if self.chunked_write_scratch.capacity() < prefix_bytes.len() + chunk.len() + 2 {
self.chunked_write_scratch
.reserve(prefix_bytes.len() + chunk.len() + 2);
}
self.chunked_write_scratch.extend_from_slice(prefix_bytes);
self.chunked_write_scratch.extend_from_slice(chunk);
self.chunked_write_scratch.extend_from_slice(b"\r\n");
return self
.stream
.write_all(&self.chunked_write_scratch)
.await
.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write chunked request body frame: {}",
e
))
});
}
match &mut self.stream {
MaybeHttpsStream::Http(tcp) => {
write_tcp_vectored_all(tcp, prefix_bytes, chunk, b"\r\n")
.await
.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write large chunked request body frame: {}",
e
))
})
}
MaybeHttpsStream::Https(_) => {
self.stream.write_all(prefix_bytes).await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write chunked request body header: {}",
e
))
})?;
self.stream.write_all(chunk).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to write chunked request body data: {}", e))
})?;
self.stream.write_all(b"\r\n").await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write chunked request body delimiter: {}",
e
))
})
}
}
}
async fn write_sized_request_stream_with_head(
&mut self,
mut request_bytes: Vec<u8>,
mut stream: RequestBodyStream,
expected_len: u64,
) -> Result<()> {
if let MaybeHttpsStream::Http(tcp_stream) = &mut self.stream {
return write_sized_request_stream_with_head_http(
tcp_stream,
request_bytes,
stream,
expected_len,
)
.await;
}
let mut sent = 0u64;
loop {
let first_poll = {
let waker = std::task::Waker::noop();
let mut cx = Context::from_waker(waker);
stream.as_mut().poll_next(&mut cx)
};
match first_poll {
Poll::Ready(Some(chunk)) => {
let chunk = chunk?;
if chunk.is_empty() {
continue;
}
let next_sent = sent + chunk.len() as u64;
if next_sent > expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent more than Content-Length {}",
expected_len
)));
}
request_bytes.extend_from_slice(&chunk);
self.write_sized_stream_bytes(&request_bytes, "head/body")
.await?;
sent = next_sent;
break;
}
Poll::Ready(None) | Poll::Pending => {
self.write_sized_stream_bytes(&request_bytes, "request")
.await?;
break;
}
}
}
while let Some(chunk) = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await {
let chunk = chunk?;
if chunk.is_empty() {
continue;
}
let next_sent = sent + chunk.len() as u64;
if next_sent > expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent more than Content-Length {}",
expected_len
)));
}
self.write_sized_stream_bytes(&chunk, "body").await?;
sent = next_sent;
}
if sent != expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
sent, expected_len
)));
}
Ok(())
}
async fn write_sized_stream_bytes(&mut self, bytes: &[u8], label: &str) -> Result<()> {
if let MaybeHttpsStream::Http(stream) = &mut self.stream {
match stream.try_write(bytes) {
Ok(n) if n == bytes.len() => return Ok(()),
Ok(n) => {
stream.write_all(&bytes[n..]).await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write sized streaming request {}: {}",
label, e
))
})?;
return Ok(());
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => {
return Err(Error::HttpProtocol(format!(
"Failed to write sized streaming request {}: {}",
label, e
)));
}
}
}
self.stream.write_all(bytes).await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write sized streaming request {}: {}",
label, e
))
})
}
async fn write_sized_request_bytes(
&mut self,
bytes: Bytes,
expected_len: Option<u64>,
) -> Result<()> {
if let Some(expected_len) = expected_len {
if bytes.len() as u64 != expected_len {
return Err(Error::HttpProtocol(format!(
"request body length mismatch: got {} bytes, Content-Length is {}",
bytes.len(),
expected_len
)));
}
}
if bytes.is_empty() {
return Ok(());
}
self.stream
.write_all(&bytes)
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to write body: {}", e)))
}
async fn read_response(&mut self, method: &Method) -> Result<Response> {
let mut buffer = Vec::with_capacity(INITIAL_HEADERS_CAPACITY);
loop {
let header_end = loop {
if buffer.len() >= MAX_HEADERS_SIZE {
return Err(Error::HttpProtocol("Response headers too large".into()));
}
if let Some(header_end) = find_header_end(&buffer) {
break header_end;
}
let mut read_buf = vec![0u8; 8192];
let n =
self.stream.read(&mut read_buf).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to read response: {}", e))
})?;
if n == 0 {
return Err(Error::HttpProtocol(
"Connection closed before response complete".into(),
));
}
buffer.extend_from_slice(&read_buf[..n]);
};
let (response, consumed) = self
.parse_response_with_remainder(&buffer, header_end, method)
.await?;
buffer = buffer[consumed..].to_vec();
if response.status >= 100 && response.status < 200 {
continue;
}
return Ok(response);
}
}
async fn read_streaming_response_headers(
&mut self,
method: &Method,
) -> Result<(Response, H1BodyMode)> {
let mut first_read_buf = [0u8; 8192];
let first_read_len = self
.stream
.read(&mut first_read_buf)
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to read response: {}", e)))?;
if first_read_len == 0 {
return Err(Error::HttpProtocol(
"Connection closed before response complete".into(),
));
}
let first_read = &first_read_buf[..first_read_len];
let mut buffer = Vec::new();
if let Some(header_end) = find_header_end(first_read) {
let (response, mode) = self.parse_streaming_response(first_read, method)?;
if response.status < 100 || response.status >= 200 {
return Ok((response, mode));
}
buffer.reserve(INITIAL_HEADERS_CAPACITY.max(first_read.len() - header_end));
buffer.extend_from_slice(&first_read[header_end..]);
} else {
buffer.reserve(INITIAL_HEADERS_CAPACITY.max(first_read.len()));
buffer.extend_from_slice(first_read);
}
loop {
let _header_end = loop {
if buffer.len() >= MAX_HEADERS_SIZE {
return Err(Error::HttpProtocol("Response headers too large".into()));
}
if let Some(header_end) = find_header_end(&buffer) {
break header_end;
}
let mut read_buf = [0u8; 8192];
let n =
self.stream.read(&mut read_buf).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to read response: {}", e))
})?;
if n == 0 {
return Err(Error::HttpProtocol(
"Connection closed before response complete".into(),
));
}
buffer.extend_from_slice(&read_buf[..n]);
};
let (response, mode) = self.parse_streaming_response(&buffer, method)?;
if response.status >= 100 && response.status < 200 {
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS_COUNT];
let mut informational = httparse::Response::new(&mut headers);
let headers_len = match informational.parse(&buffer) {
Ok(httparse::Status::Complete(len)) => len,
Ok(httparse::Status::Partial) => {
return Err(Error::HttpProtocol("Incomplete response headers".into()));
}
Err(e) => {
return Err(Error::HttpProtocol(format!(
"Failed to parse response: {}",
e
)));
}
};
buffer = buffer[headers_len..].to_vec();
continue;
}
return Ok((response, mode));
}
}
fn parse_streaming_response(
&mut self,
buffer: &[u8],
request_method: &Method,
) -> Result<(Response, H1BodyMode)> {
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS_COUNT];
let mut response = httparse::Response::new(&mut headers);
let parsed = response
.parse(buffer)
.map_err(|e| Error::HttpProtocol(format!("Failed to parse response: {}", e)))?;
let headers_len = match parsed {
httparse::Status::Complete(len) => len,
httparse::Status::Partial => {
return Err(Error::HttpProtocol("Incomplete response headers".into()));
}
};
let status = response
.code
.ok_or_else(|| Error::HttpProtocol("Missing status code".into()))?;
let version = http_1_version_string(response.version);
let mut response_headers_vec = Vec::new();
let mut transfer_encoding_present = false;
let mut is_chunked = false;
let mut content_length_index = None;
for header in response.headers.iter().filter(|h| !h.name.is_empty()) {
let index = response_headers_vec.len();
let value = String::from_utf8_lossy(header.value).into_owned();
if header.name.eq_ignore_ascii_case("connection")
&& header_value_contains_token(&value, "close")
{
self.should_close = true;
} else if header.name.eq_ignore_ascii_case("transfer-encoding") {
transfer_encoding_present = true;
is_chunked = transfer_encoding_final_is_chunked(&value);
} else if header.name.eq_ignore_ascii_case("content-length")
&& content_length_index.is_none()
{
content_length_index = Some(index);
}
response_headers_vec.push((header.name.to_string(), value));
}
let content_length = if transfer_encoding_present {
None
} else if let Some(index) = content_length_index {
Some(parse_content_length(&response_headers_vec[index].1)?)
} else {
None
};
let response_headers = Headers::from(response_headers_vec);
let has_body = !matches!(status, 100..=199 | 204 | 304) && *request_method != Method::HEAD;
let initial = BytesMut::from(&buffer[headers_len..]);
let mode = if !has_body {
H1BodyMode::Empty
} else {
if is_chunked {
H1BodyMode::Chunked { buffer: initial }
} else if transfer_encoding_present {
self.should_close = true;
H1BodyMode::CloseDelimited { buffer: initial }
} else if let Some(remaining) = content_length {
H1BodyMode::Fixed {
remaining,
buffer: initial,
}
} else {
self.should_close = true;
H1BodyMode::CloseDelimited { buffer: initial }
}
};
let response = Response::new(status, response_headers, Bytes::new(), version);
Ok((response, mode))
}
async fn parse_response_with_remainder(
&mut self,
buffer: &[u8],
_header_end: usize,
request_method: &Method,
) -> Result<(Response, usize)> {
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS_COUNT];
let mut response = httparse::Response::new(&mut headers);
let parsed = response
.parse(buffer)
.map_err(|e| Error::HttpProtocol(format!("Failed to parse response: {}", e)))?;
let headers_len = match parsed {
httparse::Status::Complete(len) => len,
httparse::Status::Partial => {
return Err(Error::HttpProtocol("Incomplete response headers".into()));
}
};
let status = response
.code
.ok_or_else(|| Error::HttpProtocol("Missing status code".into()))?;
let version = format!("HTTP/1.{}", response.version.unwrap_or(1));
let response_headers: Vec<(String, String)> = response
.headers
.iter()
.filter(|h| !h.name.is_empty())
.map(|h| {
(
h.name.to_string(),
String::from_utf8_lossy(h.value).to_string(),
)
})
.collect();
let response_headers = Headers::from(response_headers);
if let Some(conn) = find_header_value(&response_headers, "connection") {
if conn.to_ascii_lowercase().contains("close") {
self.should_close = true;
}
}
let has_body = !matches!(status, 100..=199 | 204 | 304) && *request_method != Method::HEAD;
if !has_body {
let resp = Response::new(status, response_headers, Bytes::new(), version);
return Ok((resp, headers_len));
}
let transfer_encoding = find_header_value(&response_headers, "transfer-encoding");
let content_length_str = find_header_value(&response_headers, "content-length");
let is_chunked = transfer_encoding
.map(|v| {
v.split(',')
.next_back()
.map(|s| s.trim().eq_ignore_ascii_case("chunked"))
.unwrap_or(false)
})
.unwrap_or(false);
let content_length = if transfer_encoding.is_some() {
None
} else if let Some(cl_str) = content_length_str {
let cl = parse_content_length(cl_str)?;
Some(cl)
} else {
None
};
let body_start = &buffer[headers_len..];
let (body, body_consumed) = if is_chunked {
let body = self.read_chunked_body(body_start.to_vec()).await?;
(body, buffer.len()) } else if let Some(len) = content_length {
let body = self.read_fixed_body(body_start, len).await?;
let body_from_buffer = body_start.len().min(len);
(body, headers_len + body_from_buffer)
} else if transfer_encoding.is_some() {
self.should_close = true;
let body = self.read_until_close(body_start).await?;
(body, buffer.len())
} else {
self.should_close = true;
let body = self.read_until_close(body_start).await?;
(body, buffer.len())
};
let resp = Response::new(status, response_headers, body, version);
Ok((resp, body_consumed))
}
async fn read_until_close(&mut self, initial: &[u8]) -> Result<Bytes> {
let mut body = initial.to_vec();
let mut read_buf = vec![0u8; 8192];
loop {
let n = self.stream.read(&mut read_buf).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to read body (close-delimited): {}", e))
})?;
if n == 0 {
break;
}
body.extend_from_slice(&read_buf[..n]);
}
Ok(Bytes::from(body))
}
async fn read_fixed_body(&mut self, initial: &[u8], content_length: usize) -> Result<Bytes> {
let initial_len = initial.len().min(content_length);
let mut body = Vec::with_capacity(content_length);
body.extend_from_slice(&initial[..initial_len]);
while body.len() < content_length {
let remaining = content_length - body.len();
let mut chunk = vec![0u8; remaining.min(8192)];
let n = self
.stream
.read(&mut chunk)
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to read body: {}", e)))?;
if n == 0 {
return Err(Error::HttpProtocol(format!(
"Connection closed before receiving full body (got {} of {} bytes)",
body.len(),
content_length
)));
}
body.extend_from_slice(&chunk[..n]);
}
Ok(Bytes::from(body))
}
async fn read_chunked_body(&mut self, initial: Vec<u8>) -> Result<Bytes> {
let mut body = Vec::new();
let mut buffer = initial;
let mut read_buf = vec![0u8; 8192];
loop {
let (chunk_size, line_end) = loop {
if let Some((size, end)) = find_chunk_size(&buffer) {
break (size, end);
}
let n = self.stream.read(&mut read_buf).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to read chunk size: {}", e))
})?;
if n == 0 {
return Err(Error::HttpProtocol(
"Connection closed while reading chunk size".into(),
));
}
buffer.extend_from_slice(&read_buf[..n]);
};
buffer = buffer[line_end..].to_vec();
if chunk_size == 0 {
self.consume_trailers(&mut buffer).await?;
break;
}
let chunk_end = chunk_size + 2; while buffer.len() < chunk_end {
let n = self.stream.read(&mut read_buf).await.map_err(|e| {
Error::HttpProtocol(format!("Failed to read chunk data: {}", e))
})?;
if n == 0 {
return Err(Error::HttpProtocol(
"Connection closed while reading chunk data".into(),
));
}
buffer.extend_from_slice(&read_buf[..n]);
}
body.extend_from_slice(&buffer[..chunk_size]);
buffer = buffer[chunk_end..].to_vec();
}
Ok(Bytes::from(body))
}
async fn consume_trailers(&mut self, buffer: &mut Vec<u8>) -> Result<()> {
let mut read_buf = vec![0u8; 4096];
loop {
if let Some(pos) = find_crlf(buffer) {
if pos == 0 {
*buffer = buffer[2..].to_vec();
return Ok(());
}
*buffer = buffer[pos + 2..].to_vec();
continue;
}
let n = self
.stream
.read(&mut read_buf)
.await
.map_err(|e| Error::HttpProtocol(format!("Failed to read trailers: {}", e)))?;
if n == 0 {
return Ok(());
}
buffer.extend_from_slice(&read_buf[..n]);
}
}
}
async fn write_sized_request_stream_with_head_http(
tcp_stream: &mut tokio::net::TcpStream,
mut request_bytes: Vec<u8>,
mut stream: RequestBodyStream,
expected_len: u64,
) -> Result<()> {
let mut sent = 0u64;
loop {
let first_poll = {
let waker = std::task::Waker::noop();
let mut cx = Context::from_waker(waker);
stream.as_mut().poll_next(&mut cx)
};
match first_poll {
Poll::Ready(Some(chunk)) => {
let chunk = chunk?;
if chunk.is_empty() {
continue;
}
let next_sent = sent + chunk.len() as u64;
if next_sent > expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent more than Content-Length {}",
expected_len
)));
}
request_bytes.extend_from_slice(&chunk);
tcp_try_write_all(tcp_stream, &request_bytes, "head/body").await?;
sent = next_sent;
break;
}
Poll::Ready(None) | Poll::Pending => {
tcp_try_write_all(tcp_stream, &request_bytes, "request").await?;
break;
}
}
}
while let Some(chunk) = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await {
let chunk = chunk?;
if chunk.is_empty() {
continue;
}
let next_sent = sent + chunk.len() as u64;
if next_sent > expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent more than Content-Length {}",
expected_len
)));
}
tcp_try_write_all(tcp_stream, &chunk, "body").await?;
sent = next_sent;
}
if sent != expected_len {
return Err(Error::HttpProtocol(format!(
"sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
sent, expected_len
)));
}
Ok(())
}
async fn write_tcp_vectored_all(
tcp: &mut tokio::net::TcpStream,
prefix: &[u8],
chunk: &[u8],
suffix: &[u8],
) -> std::io::Result<()> {
use std::io::IoSlice;
let mut bufs = [
IoSlice::new(prefix),
IoSlice::new(chunk),
IoSlice::new(suffix),
];
let mut bufs: &mut [IoSlice<'_>] = &mut bufs;
while !bufs.is_empty() {
let n = tcp.write_vectored(bufs).await?;
if n == 0 {
return Err(std::io::ErrorKind::WriteZero.into());
}
IoSlice::advance_slices(&mut bufs, n);
}
Ok(())
}
async fn tcp_try_write_all(
tcp_stream: &mut tokio::net::TcpStream,
bytes: &[u8],
label: &str,
) -> Result<()> {
match tcp_stream.try_write(bytes) {
Ok(n) if n == bytes.len() => Ok(()),
Ok(n) => tcp_stream.write_all(&bytes[n..]).await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write sized streaming request {}: {}",
label, e
))
}),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tcp_stream.write_all(bytes).await.map_err(|e| {
Error::HttpProtocol(format!(
"Failed to write sized streaming request {}: {}",
label, e
))
})
}
Err(e) => Err(Error::HttpProtocol(format!(
"Failed to write sized streaming request {}: {}",
label, e
))),
}
}
fn find_header_end(buffer: &[u8]) -> Option<usize> {
for i in 0..buffer.len().saturating_sub(3) {
if &buffer[i..i + 4] == b"\r\n\r\n" {
return Some(i + 4);
}
}
None
}
fn find_header_value<'a>(headers: &'a Headers, name: &str) -> Option<&'a str> {
headers.get(name)
}
fn http_1_version_string(version: Option<u8>) -> String {
match version.unwrap_or(1) {
0 => "HTTP/1.0".to_string(),
1 => "HTTP/1.1".to_string(),
version => format!("HTTP/1.{}", version),
}
}
fn header_value_contains_token(value: &str, token: &str) -> bool {
value
.split(',')
.any(|part| part.trim().eq_ignore_ascii_case(token))
}
fn transfer_encoding_final_is_chunked(value: &str) -> bool {
value
.split(',')
.next_back()
.map(|part| part.trim().eq_ignore_ascii_case("chunked"))
.unwrap_or(false)
}
fn find_chunk_size(buffer: &[u8]) -> Option<(usize, usize)> {
for i in 0..buffer.len().saturating_sub(1) {
if &buffer[i..i + 2] == b"\r\n" {
let line = &buffer[..i];
let size_str = String::from_utf8_lossy(line);
let size_part = size_str.split(';').next()?;
let size = usize::from_str_radix(size_part.trim(), 16).ok()?;
return Some((size, i + 2));
}
}
None
}
fn find_crlf(buffer: &[u8]) -> Option<usize> {
(0..buffer.len().saturating_sub(1)).find(|&i| &buffer[i..i + 2] == b"\r\n")
}
fn validate_header_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(Error::HttpProtocol("Empty header name".into()));
}
for b in name.bytes() {
if !is_tchar(b) {
return Err(Error::HttpProtocol(format!(
"Invalid character in header name: {:?}",
name
)));
}
}
Ok(())
}
fn is_tchar(b: u8) -> bool {
matches!(b,
b'!' | b'#' | b'$' | b'%' | b'&' | b'\'' | b'*' | b'+' | b'-' | b'.' |
b'^' | b'_' | b'`' | b'|' | b'~' | b'0'..=b'9' | b'A'..=b'Z' | b'a'..=b'z'
)
}
fn validate_header_value(value: &str) -> Result<()> {
for b in value.bytes() {
if b == 0 || b == b'\r' || b == b'\n' {
return Err(Error::HttpProtocol(
"Invalid character in header value (CR/LF/NUL not allowed)".into(),
));
}
}
Ok(())
}
fn parse_content_length(value: &str) -> Result<usize> {
let parts: Vec<&str> = value.split(',').map(|s| s.trim()).collect();
if parts.is_empty() {
return Err(Error::HttpProtocol("Empty Content-Length".into()));
}
let first = parts[0]
.parse::<usize>()
.map_err(|_| Error::HttpProtocol(format!("Invalid Content-Length: {}", value)))?;
for part in &parts[1..] {
let val = part
.parse::<usize>()
.map_err(|_| Error::HttpProtocol(format!("Invalid Content-Length: {}", value)))?;
if val != first {
return Err(Error::HttpProtocol(format!(
"Conflicting Content-Length values: {}",
value
)));
}
}
Ok(first)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_find_header_end() {
let data = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
assert_eq!(find_header_end(data), Some(38));
let partial = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n";
assert_eq!(find_header_end(partial), None);
}
#[test]
fn test_find_chunk_size() {
assert_eq!(find_chunk_size(b"5\r\nhello"), Some((5, 3)));
assert_eq!(find_chunk_size(b"a\r\n0123456789"), Some((10, 3)));
assert_eq!(find_chunk_size(b"0\r\n"), Some((0, 3)));
assert_eq!(find_chunk_size(b"5;ext=val\r\ndata"), Some((5, 11)));
}
#[test]
fn test_find_header_value() {
let headers = Headers::from(vec![
("Content-Type".to_string(), "text/html".to_string()),
("Content-Length".to_string(), "100".to_string()),
]);
assert_eq!(
find_header_value(&headers, "content-type"),
Some("text/html")
);
assert_eq!(find_header_value(&headers, "Content-Length"), Some("100"));
assert_eq!(find_header_value(&headers, "missing"), None);
}
#[test]
fn host_only_uri_with_query_uses_slash_origin_form() {
let uri: Uri = "http://127.0.0.1:18743?client_version=26.506.31421"
.parse()
.unwrap();
let request = H1Connection::build_request_bytes(
&Method::GET,
&uri,
&Headers::new(),
H1RequestBodyKind::None,
)
.unwrap();
let request = String::from_utf8(request).unwrap();
assert!(request.starts_with("GET /?client_version=26.506.31421 HTTP/1.1\r\n"));
}
#[test]
fn test_validate_header_name_valid() {
assert!(validate_header_name("Content-Type").is_ok());
assert!(validate_header_name("X-Custom-Header").is_ok());
assert!(validate_header_name("Accept").is_ok());
assert!(validate_header_name("x-foo-123").is_ok());
assert!(validate_header_name("X!#$%&'*+.^_`|~").is_ok());
}
#[test]
fn test_validate_header_name_invalid() {
assert!(validate_header_name("").is_err());
assert!(validate_header_name("Content Type").is_err());
assert!(validate_header_name("Content:Type").is_err());
assert!(validate_header_name("Content\x00Type").is_err());
assert!(validate_header_name("Content(Type)").is_err());
}
#[test]
fn test_validate_header_value_valid() {
assert!(validate_header_value("text/html").is_ok());
assert!(validate_header_value("application/json; charset=utf-8").is_ok());
assert!(validate_header_value("").is_ok());
assert!(validate_header_value("value\twith\ttabs").is_ok());
}
#[test]
fn test_validate_header_value_invalid_crlf_injection() {
assert!(validate_header_value("value\r\nEvil-Header: injected").is_err());
assert!(validate_header_value("value\nEvil-Header: injected").is_err());
assert!(validate_header_value("value\rmore").is_err());
assert!(validate_header_value("value\x00more").is_err());
}
#[test]
fn test_parse_content_length_valid() {
assert_eq!(parse_content_length("0").unwrap(), 0);
assert_eq!(parse_content_length("100").unwrap(), 100);
assert_eq!(parse_content_length("12345678").unwrap(), 12345678);
}
#[test]
fn test_parse_content_length_multiple_identical() {
assert_eq!(parse_content_length("100, 100").unwrap(), 100);
assert_eq!(parse_content_length("100, 100, 100").unwrap(), 100);
assert_eq!(parse_content_length("0, 0").unwrap(), 0);
}
#[test]
fn test_parse_content_length_multiple_conflicting() {
assert!(parse_content_length("100, 200").is_err());
assert!(parse_content_length("0, 1").is_err());
}
#[test]
fn test_parse_content_length_invalid() {
assert!(parse_content_length("-1").is_err());
assert!(parse_content_length("abc").is_err());
assert!(parse_content_length("100abc").is_err());
assert!(parse_content_length("100.5").is_err());
}
#[test]
fn test_find_crlf() {
assert_eq!(find_crlf(b"\r\n"), Some(0));
assert_eq!(find_crlf(b"hello\r\nworld"), Some(5));
assert_eq!(find_crlf(b"no crlf here"), None);
assert_eq!(find_crlf(b"\r"), None); assert_eq!(find_crlf(b"\n"), None); assert_eq!(find_crlf(b""), None);
}
#[test]
fn test_is_tchar() {
assert!(is_tchar(b'a'));
assert!(is_tchar(b'z'));
assert!(is_tchar(b'A'));
assert!(is_tchar(b'Z'));
assert!(is_tchar(b'0'));
assert!(is_tchar(b'9'));
assert!(is_tchar(b'!'));
assert!(is_tchar(b'#'));
assert!(is_tchar(b'$'));
assert!(is_tchar(b'%'));
assert!(is_tchar(b'&'));
assert!(is_tchar(b'\''));
assert!(is_tchar(b'*'));
assert!(is_tchar(b'+'));
assert!(is_tchar(b'-'));
assert!(is_tchar(b'.'));
assert!(is_tchar(b'^'));
assert!(is_tchar(b'_'));
assert!(is_tchar(b'`'));
assert!(is_tchar(b'|'));
assert!(is_tchar(b'~'));
assert!(!is_tchar(b' '));
assert!(!is_tchar(b'\t'));
assert!(!is_tchar(b':'));
assert!(!is_tchar(b';'));
assert!(!is_tchar(b'('));
assert!(!is_tchar(b')'));
assert!(!is_tchar(b'<'));
assert!(!is_tchar(b'>'));
assert!(!is_tchar(b'@'));
assert!(!is_tchar(b','));
assert!(!is_tchar(b'/'));
assert!(!is_tchar(b'['));
assert!(!is_tchar(b']'));
assert!(!is_tchar(b'?'));
assert!(!is_tchar(b'='));
assert!(!is_tchar(b'{'));
assert!(!is_tchar(b'}'));
assert!(!is_tchar(b'"'));
assert!(!is_tchar(b'\\'));
assert!(!is_tchar(0)); }
#[test]
fn test_find_chunk_size_case_insensitive_hex() {
assert_eq!(find_chunk_size(b"A\r\n"), Some((10, 3)));
assert_eq!(find_chunk_size(b"a\r\n"), Some((10, 3)));
assert_eq!(find_chunk_size(b"FF\r\n"), Some((255, 4)));
assert_eq!(find_chunk_size(b"ff\r\n"), Some((255, 4)));
assert_eq!(find_chunk_size(b"Ff\r\n"), Some((255, 4)));
}
#[test]
fn test_find_chunk_size_with_extensions() {
assert_eq!(find_chunk_size(b"10;name=value\r\n"), Some((16, 15)));
assert_eq!(find_chunk_size(b"10;name\r\n"), Some((16, 9)));
assert_eq!(find_chunk_size(b"10;a=b;c=d\r\n"), Some((16, 12)));
}
#[test]
fn test_find_chunk_size_large() {
assert_eq!(find_chunk_size(b"FFFFF\r\n"), Some((0xFFFFF, 7)));
}
#[test]
fn test_find_chunk_size_invalid() {
assert_eq!(find_chunk_size(b"XYZ\r\n"), None);
assert_eq!(find_chunk_size(b"10"), None);
assert_eq!(find_chunk_size(b""), None);
}
}