#![cfg_attr(not(test), warn(clippy::nursery, clippy::unwrap_used, clippy::todo, clippy::dbg_macro,))]
pub use async_iterator::LendingIterator;
use bytes::{Buf, Bytes, BytesMut};
pub use futures_util::{TryStream, TryStreamExt};
pub use http::header;
use http::{HeaderMap, HeaderName, HeaderValue};
use httparse::{EMPTY_HEADER, Status, parse_headers};
use memchr::memmem::Finder;
use std::{
error::Error as StdError,
marker::PhantomPinned,
mem,
ops::Not,
pin::Pin,
slice,
str::FromStr,
task::{Context, Poll},
};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("the stream has been terminated before the end of the part")]
EarlyTerminate,
#[error("stream error: {0}")]
StreamError(Box<dyn StdError + Send + Sync>),
#[error("parse error: {0}")]
ParseError(#[from] ParseError),
#[error("body stream is not consumed")]
BodyNotConsumed,
}
#[derive(Debug)]
enum ParserState {
Preamble(usize),
ReadingHeaders(usize),
StreamingBody(usize),
Finished,
}
#[derive(Error, Debug)]
pub enum ParseError {
#[error(transparent)]
Other(#[from] httparse::Error),
#[error("buffer no change")]
BufferNoChange,
#[error("incomplete headers content")]
TryParsePartial,
}
const CRLF: &[u8] = b"\r\n";
const DOUBLE_HYPHEN: &[u8] = b"--";
const HEADER_BODY_SPLITTER: &[u8] = b"\r\n\r\n";
pub struct MultipartStream<S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
rx: S,
terminated: bool,
state: ParserState,
pattern: Box<[u8]>,
boundary_finder: Finder<'static>,
boundary_finder_no_crlf: Finder<'static>,
header_body_splitter_finder: Finder<'static>,
buf: BytesMut,
_pin: PhantomPinned,
}
impl<S> MultipartStream<S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
#[inline]
pub fn new(stream: S, boundary: &[u8]) -> Self {
let boundary_len = boundary.len();
assert!(boundary_len > 0 && boundary_len <= 70, "boundary length must be between 1 and 70 bytes");
let pre_alloc_size = boundary_len + 2 * CRLF.len() + DOUBLE_HYPHEN.len();
let mut pattern = Vec::with_capacity(pre_alloc_size);
pattern.extend_from_slice(CRLF);
pattern.extend_from_slice(DOUBLE_HYPHEN);
pattern.extend_from_slice(boundary);
pattern.extend_from_slice(CRLF);
let pattern = pattern.into_boxed_slice();
let pattern_ptr = pattern.as_ptr();
let boundary_finder = Finder::new(unsafe { slice::from_raw_parts(pattern_ptr, pattern.len() - 2) });
let boundary_finder_no_crlf = Finder::new(unsafe {
let p = pattern_ptr.add(CRLF.len());
slice::from_raw_parts(p, pattern.len() - CRLF.len())
});
Self {
rx: stream,
terminated: false,
state: ParserState::Preamble(0),
buf: BytesMut::with_capacity(4 * 1024),
pattern,
boundary_finder,
boundary_finder_no_crlf,
header_body_splitter_finder: Finder::new(HEADER_BODY_SPLITTER),
_pin: PhantomPinned,
}
}
#[inline]
fn update_scan(&mut self, new_scan: usize) {
use ParserState::*;
match &mut self.state {
Preamble(scan) | ReadingHeaders(scan) | StreamingBody(scan) => *scan = new_scan,
Finished => unreachable!("cannot invoke add_scan on finished state"),
}
}
#[inline]
fn poll_next_body_chunk(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
use ParserState::*;
use Poll::*;
let pattern_len = self.pattern.len();
let sub_pattern_len = pattern_len - 2; loop {
let prev_buf_len = self.buf.len();
let scan = match self.state {
Preamble(_) | ReadingHeaders(_) | Finished => return Ready(None),
StreamingBody(scan) => scan,
};
if prev_buf_len >= pattern_len + scan {
if let Some(pos) = self.boundary_finder.find(&self.buf[scan..]) {
let pattern_start = scan + pos;
let pattern_tail = {
let pos = pattern_start + sub_pattern_len;
self.buf.get(pos..pos + 2) };
match pattern_tail {
Some(CRLF) => {
self.state = Preamble(0); let chunk = self.buf.split_to(pattern_start).freeze();
return Ready(Some(Ok(chunk)));
}
Some(DOUBLE_HYPHEN) => {
self.state = Finished;
let chunk = self.buf.split_to(pattern_start).freeze();
self.buf.clear(); return Ready(Some(Ok(chunk)));
}
Some(_) => {
let last_window_end = self.buf.len() - sub_pattern_len;
let chunk = self.buf.split_to(last_window_end).freeze();
self.update_scan(0);
return Ready(Some(Ok(chunk)));
}
None => (),
}
} else {
let last_window_end = self.buf.len() - sub_pattern_len + 1;
let chunk = self.buf.split_to(last_window_end).freeze();
self.update_scan(0);
return Ready(Some(Ok(chunk)));
}
}
if self.terminated && self.buf.len() == prev_buf_len {
return Ready(Some(Err(Error::EarlyTerminate)));
}
return match self.rx.try_poll_next_unpin(cx) {
Ready(Some(Ok(chunk))) => {
self.buf.extend_from_slice(&chunk);
continue;
}
Ready(Some(Err(err))) => Ready(Some(Err(Error::StreamError(Box::new(err))))),
Ready(None) => {
self.terminated = true;
continue;
}
Pending => Pending,
};
}
}
fn poll_next_part(&'_ mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Part<'_, S>, Error>>> {
loop {
use ParserState::*;
use Poll::*;
let prev_buf_len = self.buf.len();
let pattern_no_crlf_len = self.pattern.len() - 2;
match self.state {
Preamble(scan) if prev_buf_len >= pattern_no_crlf_len + scan => {
if let Some(pos) = self.boundary_finder_no_crlf.find(&self.buf[scan..]) {
let total_advance_len = scan + pos + pattern_no_crlf_len;
if self.buf.len() >= total_advance_len {
self.buf.advance(total_advance_len);
self.state = ReadingHeaders(0);
}
} else {
let new_scan = prev_buf_len - pattern_no_crlf_len + 1;
if new_scan == scan {
return Ready(Some(Err(ParseError::BufferNoChange.into())));
}
self.update_scan(new_scan);
}
}
ReadingHeaders(scan) if prev_buf_len >= HEADER_BODY_SPLITTER.len() + scan => {
if let Some(pos) = self.header_body_splitter_finder.find(&self.buf[scan..]) {
let hdrs_end = scan + pos + HEADER_BODY_SPLITTER.len();
let hdrs_content = &self.buf[..hdrs_end]; let mut hdrs_buf = [EMPTY_HEADER; 64];
match parse_headers(hdrs_content, &mut hdrs_buf) {
Ok(Status::Complete(_)) => (),
Ok(Status::Partial) => return Ready(Some(Err(ParseError::TryParsePartial.into()))),
Err(err) => return Ready(Some(Err(ParseError::Other(err).into()))),
}
let headers = hdrs_buf
.iter()
.take_while(|hdr| hdr.name.is_empty().not())
.filter_map(|hdr| {
let name = HeaderName::from_str(hdr.name);
let value = HeaderValue::from_bytes(hdr.value);
name.ok().zip(value.ok())
})
.collect::<HeaderMap>();
self.buf.advance(hdrs_end);
self.state = StreamingBody(0);
return Ready(Some(Ok(Part::new(self, headers))));
} else {
let new_scan = self.buf.len() - HEADER_BODY_SPLITTER.len() + 1;
if new_scan == scan {
return Ready(Some(Err(ParseError::BufferNoChange.into())));
}
self.update_scan(new_scan);
};
}
Finished => return Ready(None),
StreamingBody(_) => return Ready(Some(const { Err(Error::BodyNotConsumed) })),
_ => (),
}
if self.terminated && self.buf.len() == prev_buf_len {
return Ready(Some(Err(Error::EarlyTerminate)));
}
return match self.rx.try_poll_next_unpin(cx) {
Ready(Some(Ok(chunk))) => {
self.buf.extend_from_slice(&chunk);
continue;
}
Ready(Some(Err(err))) => Ready(Some(Err(Error::StreamError(Box::new(err))))),
Ready(None) => {
self.terminated = true;
continue;
}
Pending => Pending,
};
}
}
}
impl<S> LendingIterator for MultipartStream<S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
type Item<'a>
= Result<Part<'a, S>, Error>
where
S: 'a;
#[inline]
fn next(&mut self) -> impl Future<Output = Option<<Self as LendingIterator>::Item<'_>>> {
NextFuture { stream: self }
}
}
pub struct NextFuture<'a, S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
stream: &'a mut MultipartStream<S>,
}
impl<'a, S> Future for NextFuture<'a, S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
type Output = Option<Result<Part<'a, S>, Error>>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream: &mut MultipartStream<S> = self.get_mut().stream;
unsafe { mem::transmute(stream.poll_next_part(cx)) }
}
}
pub struct Part<'a, S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
body: &'a mut MultipartStream<S>,
headers: HeaderMap,
}
impl<'a, S> Part<'a, S>
where
S: TryStream<Ok = Bytes> + Unpin,
S::Error: StdError + Send + Sync + 'static,
{
#[inline]
const fn new(stream: &'a mut MultipartStream<S>, headers: HeaderMap) -> Self { Self { body: stream, headers } }
#[inline]
pub const fn headers(&self) -> &HeaderMap { &self.headers }
#[inline]
pub fn into_headers(self) -> HeaderMap { self.headers }
#[inline]
pub fn body(self) -> impl TryStream<Ok = Bytes, Error = Error> + 'a {
futures_util::stream::poll_fn(move |cx| self.body.poll_next_body_chunk(cx))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::stream;
use std::convert::Infallible;
fn create_stream_from_chunks(data: &[u8], chunk_size: usize) -> impl TryStream<Ok = Bytes, Error = Infallible> {
let chunks: Vec<Result<Bytes, Infallible>> =
data.chunks(chunk_size).map(|chunk| Ok(Bytes::from(chunk.to_vec()))).collect();
stream::iter(chunks)
}
async fn concat_body(s: impl TryStream<Ok = Bytes, Error = Error>) -> Result<Vec<u8>, Error> {
s.try_fold(vec![], |mut acc, chunk| async move {
acc.extend_from_slice(&chunk);
Ok(acc)
})
.await
}
#[tokio::test]
async fn test_single_part_full_chunk() {
const BOUNDARY: &str = "boundary";
const CONTENT: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--boundary--\r\n";
let stream = create_stream_from_chunks(CONTENT, CONTENT.len());
let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes());
while let Some(Ok(part)) = m.next().await {
assert_eq!(part.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1")
}
}
#[tokio::test]
async fn test_multiple_parts_small_chunks() {
const BOUNDARY: &str = "X-BOUNDARY";
const BODY: &[u8] = b"\
--X-BOUNDARY\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--X-BOUNDARY\r\n\
Content-Disposition: form-data; name=\"field2\"\r\n\
Content-Type: text/plain\r\n\
\r\n\
value2 with CRLF\r\n\r\n\
--X-BOUNDARY--\r\n";
let stream = create_stream_from_chunks(BODY, 5);
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let part1 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part1.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");
assert!(!part1.headers().contains_key("content-type"));
assert_eq!(&concat_body(part1.body()).await.unwrap(), b"value1");
let part2 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part2.headers().get("content-disposition").unwrap(), "form-data; name=\"field2\"");
assert_eq!(part2.headers().get("content-type").unwrap(), "text/plain");
let body = concat_body(part2.body()).await.unwrap();
assert_eq!(&body, b"value2 with CRLF\r\n");
let result = multipart_stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_with_preamble_and_no_final_crlf() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
This is a preamble and should be ignored.\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--boundary--";
let stream = create_stream_from_chunks(BODY, BODY.len());
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let part = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");
let body = concat_body(part.body()).await.unwrap();
assert_eq!(&body, b"value1");
let result = multipart_stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_early_terminate_in_body() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1 is not complete";
let stream = create_stream_from_chunks(BODY, BODY.len());
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let part = multipart_stream.next().await.unwrap().unwrap();
let result = concat_body(part.body()).await;
assert!(matches!(result, Err(Error::EarlyTerminate)));
}
#[tokio::test]
async fn test_early_terminate_in_headers() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; na";
let stream = create_stream_from_chunks(BODY, BODY.len());
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let result = multipart_stream.next().await;
assert!(matches!(result, Some(Err(Error::EarlyTerminate))));
}
#[tokio::test]
async fn test_empty_stream() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"";
let stream = create_stream_from_chunks(BODY, 10);
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let result = multipart_stream.next().await;
assert!(matches!(result, Some(Err(Error::EarlyTerminate))));
}
#[tokio::test]
async fn test_part_with_empty_body() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"empty_field\"\r\n\
\r\n\
\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"field2\"\r\n\
\r\n\
value2\r\n\
--boundary--\r\n";
let stream = create_stream_from_chunks(BODY, 15); let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let part1 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part1.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");
assert_eq!(&concat_body(part1.body()).await.unwrap(), b"value1");
let part2 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part2.headers().get("content-disposition").unwrap(), "form-data; name=\"empty_field\"");
let body = concat_body(part2.body()).await.unwrap();
assert!(body.is_empty());
let part3 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part3.headers().get("content-disposition").unwrap(), "form-data; name=\"field2\"");
assert_eq!(&concat_body(part3.body()).await.unwrap(), b"value2");
let result = multipart_stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_body_not_consumed_error() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"field2\"\r\n\
\r\n\
value2\r\n\
--boundary--\r\n";
let stream = create_stream_from_chunks(BODY, BODY.len());
let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes());
let _part1 = m.next().await.unwrap().unwrap();
let result = m.next().await;
assert!(matches!(result, Some(Err(Error::BodyNotConsumed))));
}
#[tokio::test]
async fn test_boundary_like_string_in_body() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1 contains --boundary text\r\n\
--boundary--\r\n";
let stream = create_stream_from_chunks(BODY, 20);
let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes());
let part = m.next().await.unwrap().unwrap();
let body = concat_body(part.body()).await.unwrap();
assert_eq!(&body, b"value1 contains --boundary text");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_malformed_headers() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Invalid Header: value\r\n\
\r\n\
body\r\n\
--boundary--\r\n";
let stream = create_stream_from_chunks(BODY, BODY.len());
let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes());
let result = m.next().await.unwrap();
assert!(matches!(result, Err(Error::ParseError(_))));
if let Err(Error::ParseError(ParseError::Other(e))) = result {
assert_eq!(e, httparse::Error::HeaderName);
} else {
panic!("Expected a ParseError::Other with InvalidHeaderName");
}
}
#[tokio::test]
async fn test_streaming_body() {
const BOUNDARY: &str = "boundary";
const PART1_BODY: &[u8] = b"This is the first part's body, which is quite long to demonstrate streaming.";
const PART2_BODY: &[u8] = b"This is the second part, which is also streamed.";
let body_content = format!(
"--{boundary}\r\nContent-Disposition: form-data; \
name=\"field1\"\r\n\r\n{part1_body}\r\n--{boundary}\r\nContent-Disposition: form-data; \
name=\"field2\"\r\n\r\n{part2_body}\r\n--{boundary}--\r\n",
boundary = BOUNDARY,
part1_body = std::str::from_utf8(PART1_BODY).unwrap(),
part2_body = std::str::from_utf8(PART2_BODY).unwrap(),
);
let body_bytes = body_content.as_bytes();
let chunk_size = 10;
let stream = create_stream_from_chunks(body_bytes, chunk_size);
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());
let part1 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part1.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");
let mut body_stream1 = part1.body();
let mut collected_body1 = Vec::new();
let mut i = 0;
while let Some(chunk_result) = body_stream1.try_next().await.unwrap() {
i += 1;
assert!(!chunk_result.is_empty());
collected_body1.extend_from_slice(&chunk_result);
}
drop(body_stream1);
assert_eq!(i, PART1_BODY.len().div_ceil(chunk_size));
assert_eq!(collected_body1, PART1_BODY);
i = 0;
let part2 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part2.headers().get("content-disposition").unwrap(), "form-data; name=\"field2\"");
let mut body_stream2 = part2.body();
let mut collected_body2 = Vec::new();
while let Some(chunk_result) = body_stream2.try_next().await.unwrap() {
i += 1;
assert!(!chunk_result.is_empty());
collected_body2.extend_from_slice(&chunk_result);
}
drop(body_stream2);
assert_eq!(collected_body2, PART2_BODY);
assert_eq!(i, PART2_BODY.len().div_ceil(chunk_size));
assert!(multipart_stream.next().await.is_none());
}
#[tokio::test]
async fn test_boundary_max_length_70_bytes() {
let boundary = "a".repeat(70);
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"field1\"\r\n\r\nvalue1\r\n--{}--\r\n",
boundary, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_boundary_min_length_1_byte() {
let boundary = "a";
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"field1\"\r\n\r\nvalue1\r\n--{}--\r\n",
boundary, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 5);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_boundary_with_special_chars() {
let boundary = "this_-boundary_123_ABC";
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"field1\"\r\n\r\nvalue1\r\n--{}--\r\n",
boundary, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_very_long_header_value() {
let boundary = "boundary";
let long_value = "a".repeat(8192); let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"field1\"\r\nX-Long-Header: {}\r\n\r\nvalue1\r\n--{}--\r\n",
boundary, long_value, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 100);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("x-long-header").unwrap().len(), 8192);
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_many_small_parts() {
let boundary = "boundary";
let num_parts = 100;
let mut body = String::new();
for i in 0..num_parts {
body.push_str(&format!(
"--{}\r\nContent-Disposition: form-data; name=\"field{}\"\r\n\r\nvalue{}\r\n",
boundary, i, i
));
}
body.push_str(&format!("--{}--\r\n", boundary));
let stream = create_stream_from_chunks(body.as_bytes(), 50);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
for i in 0..num_parts {
let part = m.next().await.unwrap().unwrap();
let expected_name = format!("form-data; name=\"field{}\"", i);
assert_eq!(part.headers().get("content-disposition").unwrap().to_str().unwrap(), expected_name);
let expected_value = format!("value{}", i);
assert_eq!(&concat_body(part.body()).await.unwrap(), expected_value.as_bytes());
}
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_single_very_large_body() {
let boundary = "boundary";
let large_body = "x".repeat(1_000_000); let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"file\"\r\n\r\n{}\r\n--{}--\r\n",
boundary, large_body, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 8192);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let received_body = concat_body(part.body()).await.unwrap();
assert_eq!(received_body.len(), 1_000_000);
assert_eq!(received_body, large_body.as_bytes());
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_empty_part_names() {
let boundary = "boundary";
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"\"\r\n\r\nvalue1\r\n--{}--\r\n",
boundary, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("content-disposition").unwrap(), "form-data; name=\"\"");
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_part_with_no_headers() {
let boundary = "boundary";
let body =
format!("--{}\r\nContent-Disposition: form-data\r\n\r\nbody value\r\n--{}--\r\n", boundary, boundary);
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert!(part.headers().get("content-disposition").is_some());
assert_eq!(&concat_body(part.body()).await.unwrap(), b"body value");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_chunk_size_variations() {
let boundary = "boundary";
let body = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--boundary--\r\n";
for chunk_size in [1, 2, 3, 4, 5, 7, 8, 15, 16, 31, 32, 63, 64, 127, 128, 255, 256, 511, 512] {
let stream = create_stream_from_chunks(body, chunk_size);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value1");
assert!(m.next().await.is_none());
}
}
#[tokio::test]
async fn test_memory_efficiency_large_file() {
let boundary = "boundary";
let large_body = "x".repeat(10_000_000); let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"file\"\r\n\r\n{}\r\n--{}--\r\n",
boundary, large_body, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 100);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let mut body_stream = part.body();
let mut chunk_count = 0;
let mut total_bytes = 0;
while let Some(chunk) = body_stream.try_next().await.unwrap() {
chunk_count += 1;
total_bytes += chunk.len();
assert!(chunk.len() <= 10000, "Chunk too large: {}", chunk.len());
}
drop(body_stream);
assert_eq!(total_bytes, 10_000_000);
assert!(chunk_count > 100, "Should have received multiple chunks, got {}", chunk_count);
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_concurrent_streams() {
let boundary = "boundary";
async fn parse_stream(boundary: &str, idx: usize) {
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"field{}\"\r\n\r\nvalue{}\r\n--{}--\r\n",
boundary, idx, idx, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 10);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let expected = format!("value{}", idx);
assert_eq!(&concat_body(part.body()).await.unwrap(), expected.as_bytes());
assert!(m.next().await.is_none());
}
let handles: Vec<_> = (0..10).map(|i| tokio::spawn(parse_stream(boundary, i))).collect();
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_file_upload_with_filename() {
let boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW";
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"test.txt\"\r\nContent-Type: \
text/plain\r\n\r\nThis is file content\r\n--{}--\r\n",
boundary, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 50);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let content_disp = part.headers().get("content-disposition").unwrap().to_str().unwrap();
assert!(content_disp.contains("filename=\"test.txt\""));
assert_eq!(part.headers().get("content-type").unwrap(), "text/plain");
assert_eq!(&concat_body(part.body()).await.unwrap(), b"This is file content");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_unicode_filename_rfc2231() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"file\"; \
filename*=UTF-8''%E4%B8%AD%E6%96%87.txt\r\nContent-Type: \
text/plain\r\n\r\ncontent\r\n--boundary--\r\n";
let stream = create_stream_from_chunks(body.as_bytes(), 30);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert!(part.headers().get("content-disposition").is_some());
assert_eq!(&concat_body(part.body()).await.unwrap(), b"content");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_multiple_content_disposition_parameters() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"; filename=\"file.txt\"; \
size=\"1024\"\r\n\r\nvalue\r\n--boundary--\r\n";
let stream = create_stream_from_chunks(body.as_bytes(), 30);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let content_disp = part.headers().get("content-disposition").unwrap().to_str().unwrap();
assert!(content_disp.contains("name=\"field\""));
assert!(content_disp.contains("filename=\"file.txt\""));
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_various_content_types() {
let boundary = "boundary";
for content_type in
["text/plain", "application/json", "application/octet-stream", "image/jpeg", "application/pdf"]
{
let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"file\"\r\nContent-Type: {}\r\n\r\ndummy \
content\r\n--{}--\r\n",
boundary, content_type, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 50);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("content-type").unwrap(), content_type);
concat_body(part.body()).await.unwrap();
assert!(m.next().await.is_none());
}
}
#[tokio::test]
async fn test_content_transfer_encoding() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"\r\nContent-Transfer-Encoding: \
binary\r\n\r\nvalue\r\n--boundary--\r\n";
let stream = create_stream_from_chunks(body.as_bytes(), 30);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("content-transfer-encoding").unwrap(), "binary");
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_case_insensitive_headers() {
let boundary = "boundary";
let body = "--boundary\r\ncontent-disposition: form-data; name=\"field\"\r\ncontent-type: \
text/plain\r\n\r\nvalue\r\n--boundary--\r\n";
let stream = create_stream_from_chunks(body.as_bytes(), 30);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert!(part.headers().get("content-disposition").is_some());
assert!(part.headers().get("Content-Disposition").is_some());
assert!(part.headers().get("CONTENT-DISPOSITION").is_some());
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_stream_error_during_body() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"\r\n\r\npartial content";
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let result = concat_body(part.body()).await;
assert!(matches!(result, Err(Error::EarlyTerminate)));
}
#[tokio::test]
async fn test_incomplete_boundary_at_end() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"\r\n\r\nvalue\r\n--bound";
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let result = concat_body(part.body()).await;
assert!(matches!(result, Err(Error::EarlyTerminate)));
}
#[tokio::test]
async fn test_malformed_boundary_missing_crlf() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"\r\n\r\nvalue\r\n--boundary--";
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(&concat_body(part.body()).await.unwrap(), b"value");
assert!(m.next().await.is_none()); }
#[tokio::test]
async fn test_missing_final_boundary() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"\r\n\r\nvalue";
let stream = create_stream_from_chunks(body.as_bytes(), 20);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let result = concat_body(part.body()).await;
assert!(matches!(result, Err(Error::EarlyTerminate)));
}
#[tokio::test]
async fn test_boundary_injection_attack() {
let boundary = "abc";
let body = "--abc\r\nContent-Disposition: form-data; name=\"field\"\r\n\r\nvalue contains --abc text \
inside\r\n--abc--\r\n";
let stream = create_stream_from_chunks(body.as_bytes(), 10);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
let received = concat_body(part.body()).await.unwrap();
assert_eq!(&received, b"value contains --abc text inside");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_header_injection_attempt() {
let boundary = "boundary";
let body = "--boundary\r\nContent-Disposition: form-data; name=\"field\"\r\nX-Custom: \
value\r\n\r\nbody\r\n--boundary--\r\n";
let stream = create_stream_from_chunks(body.as_bytes(), 30);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
let part = m.next().await.unwrap().unwrap();
assert_eq!(part.headers().get("x-custom").unwrap(), "value");
assert_eq!(&concat_body(part.body()).await.unwrap(), b"body");
assert!(m.next().await.is_none());
}
#[tokio::test]
async fn test_extremely_long_line_in_headers() {
let boundary = "boundary";
let long_value = "a".repeat(100_000); let body = format!(
"--{}\r\nContent-Disposition: form-data; name=\"field\"\r\nX-Long: {}\r\n\r\nbody\r\n--{}--\r\n",
boundary, long_value, boundary
);
let stream = create_stream_from_chunks(body.as_bytes(), 1000);
let mut m = MultipartStream::new(stream, boundary.as_bytes());
match m.next().await {
Some(Ok(part)) => {
concat_body(part.body()).await.unwrap();
assert!(m.next().await.is_none());
}
Some(Err(Error::ParseError(_))) => {
}
_ => panic!("Unexpected result"),
}
}
}