use crate::parser::{BodyLength, ParseError};
pub const DEFAULT_MAX_BODY_SIZE: usize = 1024 * 1024;
#[derive(Debug, Clone)]
pub struct BodyConfig {
max_size: usize,
initial_capacity: usize,
}
impl Default for BodyConfig {
fn default() -> Self {
Self {
max_size: DEFAULT_MAX_BODY_SIZE,
initial_capacity: 4096,
}
}
}
impl BodyConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_max_size(mut self, size: usize) -> Self {
self.max_size = size;
self
}
#[must_use]
pub fn with_initial_capacity(mut self, capacity: usize) -> Self {
self.initial_capacity = capacity;
self
}
#[must_use]
pub fn max_size(&self) -> usize {
self.max_size
}
#[must_use]
pub fn initial_capacity(&self) -> usize {
self.initial_capacity
}
}
#[derive(Debug)]
pub enum BodyError {
TooLarge {
size: usize,
max: usize,
},
InvalidChunkedEncoding {
detail: &'static str,
},
Incomplete {
received: usize,
expected: Option<usize>,
},
UnexpectedEof,
Parse(ParseError),
}
impl std::fmt::Display for BodyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TooLarge { size, max } => {
write!(f, "body too large: {size} bytes exceeds limit of {max}")
}
Self::InvalidChunkedEncoding { detail } => {
write!(f, "invalid chunked encoding: {detail}")
}
Self::Incomplete { received, expected } => {
if let Some(exp) = expected {
write!(f, "incomplete body: received {received} of {exp} bytes")
} else {
write!(f, "incomplete body: received {received} bytes")
}
}
Self::UnexpectedEof => write!(f, "unexpected end of body"),
Self::Parse(e) => write!(f, "parse error: {e}"),
}
}
}
impl std::error::Error for BodyError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Parse(e) => Some(e),
_ => None,
}
}
}
impl From<ParseError> for BodyError {
fn from(e: ParseError) -> Self {
Self::Parse(e)
}
}
#[derive(Debug)]
pub struct ContentLengthReader<'a> {
buffer: &'a [u8],
length: usize,
position: usize,
#[allow(dead_code)]
config: BodyConfig,
}
impl<'a> ContentLengthReader<'a> {
pub fn new(buffer: &'a [u8], length: usize, config: &BodyConfig) -> Result<Self, BodyError> {
if length > config.max_size {
return Err(BodyError::TooLarge {
size: length,
max: config.max_size,
});
}
Ok(Self {
buffer,
length,
position: 0,
config: config.clone(),
})
}
#[must_use]
pub fn length(&self) -> usize {
self.length
}
#[must_use]
pub fn remaining(&self) -> usize {
self.length.saturating_sub(self.position)
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.position >= self.length
}
pub fn read(&mut self, dest: &mut [u8]) -> Result<usize, BodyError> {
if self.is_complete() {
return Ok(0);
}
let available = self.buffer.len().saturating_sub(self.position);
let to_read = dest.len().min(self.remaining()).min(available);
if to_read == 0 && !self.is_complete() {
return Err(BodyError::Incomplete {
received: self.position,
expected: Some(self.length),
});
}
dest[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
self.position += to_read;
Ok(to_read)
}
pub fn read_all(&mut self) -> Result<Vec<u8>, BodyError> {
if self.buffer.len() < self.length {
return Err(BodyError::Incomplete {
received: self.buffer.len(),
expected: Some(self.length),
});
}
let body = self.buffer[..self.length].to_vec();
self.position = self.length;
Ok(body)
}
pub fn read_all_borrowed(&self) -> Result<&'a [u8], BodyError> {
if self.buffer.len() < self.length {
return Err(BodyError::Incomplete {
received: self.buffer.len(),
expected: Some(self.length),
});
}
Ok(&self.buffer[..self.length])
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChunkedState {
ChunkSize,
ChunkData { remaining: usize },
ChunkDataEnd,
Trailers,
Complete,
}
#[derive(Debug)]
pub struct ChunkedReader<'a> {
buffer: &'a [u8],
position: usize,
state: ChunkedState,
total_size: usize,
config: BodyConfig,
}
impl<'a> ChunkedReader<'a> {
#[must_use]
pub fn new(buffer: &'a [u8], config: &BodyConfig) -> Self {
Self {
buffer,
position: 0,
state: ChunkedState::ChunkSize,
total_size: 0,
config: config.clone(),
}
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.state == ChunkedState::Complete
}
#[must_use]
pub fn total_size(&self) -> usize {
self.total_size
}
fn parse_chunk_size(&self) -> Result<(usize, usize), BodyError> {
let remaining = &self.buffer[self.position..];
let line_end =
remaining
.windows(2)
.position(|w| w == b"\r\n")
.ok_or(BodyError::Incomplete {
received: self.position,
expected: None,
})?;
let size_line = &remaining[..line_end];
let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';') {
&size_line[..semi]
} else {
size_line
};
let size_str =
std::str::from_utf8(size_str).map_err(|_| BodyError::InvalidChunkedEncoding {
detail: "invalid UTF-8 in chunk size",
})?;
let size = usize::from_str_radix(size_str.trim(), 16).map_err(|_| {
BodyError::InvalidChunkedEncoding {
detail: "invalid hex chunk size",
}
})?;
const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
if size > MAX_SINGLE_CHUNK {
return Err(BodyError::InvalidChunkedEncoding {
detail: "chunk size exceeds 16MB limit",
});
}
Ok((size, line_end + 2))
}
pub fn decode_all(&mut self) -> Result<Vec<u8>, BodyError> {
let mut output = Vec::with_capacity(self.config.initial_capacity);
loop {
match self.state {
ChunkedState::ChunkSize => {
let (size, consumed) = self.parse_chunk_size()?;
self.position += consumed;
let new_total = self.total_size.saturating_add(size);
if new_total > self.config.max_size {
return Err(BodyError::TooLarge {
size: new_total,
max: self.config.max_size,
});
}
if size == 0 {
self.state = ChunkedState::Trailers;
} else {
self.state = ChunkedState::ChunkData { remaining: size };
}
}
ChunkedState::ChunkData { remaining } => {
let available = self.buffer.len().saturating_sub(self.position);
if available < remaining {
return Err(BodyError::Incomplete {
received: self.total_size + (remaining - available),
expected: None,
});
}
let chunk_data = &self.buffer[self.position..self.position + remaining];
output.extend_from_slice(chunk_data);
self.position += remaining;
self.total_size += remaining;
self.state = ChunkedState::ChunkDataEnd;
}
ChunkedState::ChunkDataEnd => {
let remaining = &self.buffer[self.position..];
if remaining.len() < 2 {
return Err(BodyError::Incomplete {
received: self.total_size,
expected: None,
});
}
if &remaining[..2] != b"\r\n" {
return Err(BodyError::InvalidChunkedEncoding {
detail: "expected CRLF after chunk data",
});
}
self.position += 2;
self.state = ChunkedState::ChunkSize;
}
ChunkedState::Trailers => {
let remaining = &self.buffer[self.position..];
if remaining.starts_with(b"\r\n") {
self.position += 2;
self.state = ChunkedState::Complete;
} else {
let line_end = remaining.windows(2).position(|w| w == b"\r\n");
match line_end {
Some(pos) => {
self.position += pos + 2;
}
None => {
return Err(BodyError::Incomplete {
received: self.total_size,
expected: None,
});
}
}
}
}
ChunkedState::Complete => {
break;
}
}
}
Ok(output)
}
#[must_use]
pub fn bytes_consumed(&self) -> usize {
self.position
}
}
pub fn parse_body(
buffer: &[u8],
body_length: BodyLength,
config: &BodyConfig,
) -> Result<Option<Vec<u8>>, BodyError> {
let (body, _) = parse_body_with_consumed(buffer, body_length, config)?;
Ok(body)
}
pub fn parse_body_with_consumed(
buffer: &[u8],
body_length: BodyLength,
config: &BodyConfig,
) -> Result<(Option<Vec<u8>>, usize), BodyError> {
match body_length {
BodyLength::None => Ok((None, 0)),
BodyLength::ContentLength(len) => {
if len == 0 {
return Ok((Some(Vec::new()), 0));
}
let mut reader = ContentLengthReader::new(buffer, len, config)?;
let body = reader.read_all()?;
Ok((Some(body), len))
}
BodyLength::Chunked => {
let mut reader = ChunkedReader::new(buffer, config);
let body = reader.decode_all()?;
Ok((Some(body), reader.bytes_consumed()))
}
BodyLength::Conflicting => Err(BodyError::InvalidChunkedEncoding {
detail: "conflicting body length indicators",
}),
}
}
pub fn validate_content_length(
content_length: usize,
config: &BodyConfig,
) -> Result<(), BodyError> {
if content_length > config.max_size {
return Err(BodyError::TooLarge {
size: content_length,
max: config.max_size,
});
}
Ok(())
}
use asupersync::io::AsyncRead;
use asupersync::stream::Stream;
use fastapi_core::RequestBodyStreamError;
use std::pin::Pin;
use std::task::{Context, Poll};
pub const DEFAULT_STREAMING_THRESHOLD: usize = 64 * 1024;
#[derive(Debug, Clone)]
pub struct StreamingBodyConfig {
pub streaming_threshold: usize,
pub chunk_size: usize,
pub max_size: usize,
}
impl Default for StreamingBodyConfig {
fn default() -> Self {
Self {
streaming_threshold: DEFAULT_STREAMING_THRESHOLD,
chunk_size: 8 * 1024, max_size: DEFAULT_MAX_BODY_SIZE,
}
}
}
impl StreamingBodyConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_streaming_threshold(mut self, threshold: usize) -> Self {
self.streaming_threshold = threshold;
self
}
#[must_use]
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size.max(1); self
}
#[must_use]
pub fn with_max_size(mut self, size: usize) -> Self {
self.max_size = size;
self
}
#[must_use]
pub fn should_stream(&self, content_length: usize) -> bool {
content_length > self.streaming_threshold
}
}
pub struct AsyncContentLengthStream<R> {
reader: Option<R>,
initial_buffer: Vec<u8>,
initial_position: usize,
expected_size: usize,
bytes_read: usize,
chunk_size: usize,
max_size: usize,
read_buffer: Vec<u8>,
complete: bool,
error: bool,
}
impl<R> AsyncContentLengthStream<R>
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
pub fn new(
initial_buffer: Vec<u8>,
reader: R,
content_length: usize,
config: &StreamingBodyConfig,
) -> Self {
Self {
reader: Some(reader),
initial_buffer,
initial_position: 0,
expected_size: content_length,
bytes_read: 0,
chunk_size: config.chunk_size,
max_size: config.max_size,
read_buffer: vec![0u8; config.chunk_size],
complete: false,
error: false,
}
}
pub fn with_defaults(initial_buffer: Vec<u8>, reader: R, content_length: usize) -> Self {
Self::new(
initial_buffer,
reader,
content_length,
&StreamingBodyConfig::default(),
)
}
#[must_use]
pub fn expected_size(&self) -> usize {
self.expected_size
}
#[must_use]
pub fn bytes_read(&self) -> usize {
self.bytes_read
}
#[must_use]
pub fn remaining(&self) -> usize {
self.expected_size.saturating_sub(self.bytes_read)
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.complete
}
fn initial_remaining(&self) -> usize {
self.initial_buffer
.len()
.saturating_sub(self.initial_position)
}
}
impl<R> Stream for AsyncContentLengthStream<R>
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
type Item = Result<Vec<u8>, RequestBodyStreamError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.complete || self.error {
return Poll::Ready(None);
}
if self.bytes_read > self.max_size {
self.error = true;
let bytes_read = self.bytes_read;
let max_size = self.max_size;
return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
received: bytes_read,
max: max_size,
})));
}
if self.bytes_read >= self.expected_size {
self.complete = true;
return Poll::Ready(None);
}
let remaining_for_body = self.expected_size.saturating_sub(self.bytes_read);
let remaining_budget = self.max_size.saturating_sub(self.bytes_read);
if remaining_for_body > 0 && remaining_budget == 0 {
self.error = true;
return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
received: self.bytes_read.saturating_add(1),
max: self.max_size,
})));
}
let initial_remaining = self.initial_remaining();
if initial_remaining > 0 {
let chunk_size = self
.chunk_size
.min(initial_remaining)
.min(remaining_for_body)
.min(remaining_budget);
if chunk_size > 0 {
let start = self.initial_position;
let chunk = self.initial_buffer[start..start + chunk_size].to_vec();
self.initial_position += chunk_size;
self.bytes_read += chunk_size;
return Poll::Ready(Some(Ok(chunk)));
}
}
let remaining = self.expected_size.saturating_sub(self.bytes_read);
let to_read = self.chunk_size.min(remaining).min(remaining_budget);
if to_read == 0 {
self.complete = true;
return Poll::Ready(None);
}
if self.read_buffer.len() < to_read {
self.read_buffer.resize(to_read, 0);
}
let mut reader = match self.reader.take() {
Some(r) => r,
None => {
self.error = true;
return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
}
};
let read_result = {
let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..to_read]);
match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let n = read_buf.filled().len();
let chunk = read_buf.filled().to_vec();
Poll::Ready(Ok((n, chunk)))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
};
match read_result {
Poll::Ready(Ok((n, chunk))) => {
if n == 0 {
self.error = true;
return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
}
self.bytes_read += n;
self.reader = Some(reader);
Poll::Ready(Some(Ok(chunk)))
}
Poll::Ready(Err(e)) => {
self.error = true;
Poll::Ready(Some(Err(RequestBodyStreamError::Io(e.to_string()))))
}
Poll::Pending => {
self.reader = Some(reader);
Poll::Pending
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AsyncChunkedState {
ChunkSize,
ChunkData { remaining: usize },
ChunkDataEnd,
Trailers,
Complete,
Error,
}
pub struct AsyncChunkedStream<R> {
#[allow(dead_code)]
reader: Option<R>,
state: AsyncChunkedState,
bytes_decoded: usize,
max_size: usize,
chunk_size: usize,
#[allow(dead_code)]
read_buffer: Vec<u8>,
buffer: Vec<u8>,
position: usize,
}
impl<R> AsyncChunkedStream<R>
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
pub fn new(initial_buffer: Vec<u8>, reader: R, config: &StreamingBodyConfig) -> Self {
assert!(
initial_buffer.len() <= config.max_size,
"initial buffer size {} exceeds max size {}",
initial_buffer.len(),
config.max_size
);
Self {
reader: Some(reader),
state: AsyncChunkedState::ChunkSize,
bytes_decoded: 0,
max_size: config.max_size,
chunk_size: config.chunk_size,
read_buffer: vec![0u8; config.chunk_size],
buffer: initial_buffer,
position: 0,
}
}
pub fn try_new(
initial_buffer: Vec<u8>,
reader: R,
config: &StreamingBodyConfig,
) -> Result<Self, RequestBodyStreamError> {
if initial_buffer.len() > config.max_size {
return Err(RequestBodyStreamError::Io(format!(
"initial buffer size {} exceeds max size {}",
initial_buffer.len(),
config.max_size
)));
}
Ok(Self {
reader: Some(reader),
state: AsyncChunkedState::ChunkSize,
bytes_decoded: 0,
max_size: config.max_size,
chunk_size: config.chunk_size,
read_buffer: vec![0u8; config.chunk_size],
buffer: initial_buffer,
position: 0,
})
}
pub fn with_defaults(initial_buffer: Vec<u8>, reader: R) -> Self {
Self::new(initial_buffer, reader, &StreamingBodyConfig::default())
}
#[must_use]
pub fn bytes_decoded(&self) -> usize {
self.bytes_decoded
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.state == AsyncChunkedState::Complete
}
fn buffer_remaining(&self) -> &[u8] {
&self.buffer[self.position..]
}
fn consume(&mut self, n: usize) {
self.position += n;
}
fn compact_buffer_if_needed(&mut self) {
if self.position == 0 {
return;
}
if self.position >= self.buffer.len() {
self.buffer.clear();
self.position = 0;
return;
}
let should_compact = self.position > 8 * 1024 || self.position > (self.buffer.len() / 2);
if should_compact {
self.buffer.drain(..self.position);
self.position = 0;
}
}
fn poll_read_more_sized(
&mut self,
cx: &mut Context<'_>,
max_read: usize,
) -> Poll<Result<usize, RequestBodyStreamError>> {
self.compact_buffer_if_needed();
let max_read = max_read.min(self.read_buffer.len());
if max_read == 0 {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Err(RequestBodyStreamError::Io(
"invalid read buffer size".to_string(),
)));
}
let mut reader = match self.reader.take() {
Some(r) => r,
None => {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Err(RequestBodyStreamError::ConnectionClosed));
}
};
let read_result = {
let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..max_read]);
match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(())) => {
let filled = read_buf.filled();
Poll::Ready(Ok(filled.len()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
};
match read_result {
Poll::Ready(Ok(n)) => {
if n == 0 {
self.state = AsyncChunkedState::Error;
self.reader = Some(reader);
return Poll::Ready(Err(RequestBodyStreamError::ConnectionClosed));
}
self.buffer.extend_from_slice(&self.read_buffer[..n]);
self.reader = Some(reader);
Poll::Ready(Ok(n))
}
Poll::Ready(Err(e)) => {
self.state = AsyncChunkedState::Error;
self.reader = Some(reader);
Poll::Ready(Err(RequestBodyStreamError::Io(e.to_string())))
}
Poll::Pending => {
self.reader = Some(reader);
Poll::Pending
}
}
}
}
impl<R> Stream for AsyncChunkedStream<R>
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
type Item = Result<Vec<u8>, RequestBodyStreamError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.state == AsyncChunkedState::Complete || self.state == AsyncChunkedState::Error {
return Poll::Ready(None);
}
loop {
match self.state {
AsyncChunkedState::ChunkSize => {
let remaining = self.buffer_remaining();
if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
let size_line = &remaining[..crlf_pos];
let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';')
{
&size_line[..semi]
} else {
size_line
};
let size_str = match std::str::from_utf8(size_str) {
Ok(s) => s.trim(),
Err(_) => {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
"invalid UTF-8 in chunk size".to_string(),
))));
}
};
let chunk_size = match usize::from_str_radix(size_str, 16) {
Ok(s) => s,
Err(_) => {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
"invalid hex chunk size".to_string(),
))));
}
};
if chunk_size > 0
&& self.bytes_decoded.saturating_add(chunk_size) > self.max_size
{
self.state = AsyncChunkedState::Error;
let bytes_decoded = self.bytes_decoded;
let max_size = self.max_size;
return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
received: bytes_decoded,
max: max_size,
})));
}
const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
if chunk_size > MAX_SINGLE_CHUNK {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
"chunk size exceeds 16MB limit".to_string(),
))));
}
self.consume(crlf_pos + 2);
if chunk_size == 0 {
self.state = AsyncChunkedState::Trailers;
continue;
}
self.state = AsyncChunkedState::ChunkData {
remaining: chunk_size,
};
continue;
}
const MAX_CHUNK_SIZE_LINE: usize = 1024;
if remaining.len() > MAX_CHUNK_SIZE_LINE {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
"chunk size line too long".to_string(),
))));
}
match self.poll_read_more_sized(cx, 1) {
Poll::Ready(Ok(_n)) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending,
}
}
AsyncChunkedState::ChunkData { remaining } => {
if remaining > 0 && self.bytes_decoded >= self.max_size {
self.state = AsyncChunkedState::Error;
let bytes_decoded = self.bytes_decoded;
let max_size = self.max_size;
return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
received: bytes_decoded,
max: max_size,
})));
}
let buffer_remaining = self.buffer_remaining();
let to_read = remaining.min(buffer_remaining.len()).min(self.chunk_size);
if to_read > 0 {
let chunk = buffer_remaining[..to_read].to_vec();
self.consume(to_read);
self.bytes_decoded += to_read;
let new_remaining = remaining - to_read;
if new_remaining == 0 {
self.state = AsyncChunkedState::ChunkDataEnd;
} else {
self.state = AsyncChunkedState::ChunkData {
remaining: new_remaining,
};
}
return Poll::Ready(Some(Ok(chunk)));
}
let want = remaining.min(self.chunk_size).max(1);
match self.poll_read_more_sized(cx, want) {
Poll::Ready(Ok(_n)) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending,
}
}
AsyncChunkedState::ChunkDataEnd => {
let remaining = self.buffer_remaining();
if remaining.len() >= 2 {
if &remaining[..2] == b"\r\n" {
self.consume(2);
self.state = AsyncChunkedState::ChunkSize;
continue;
}
self.state = AsyncChunkedState::Error;
return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
"expected CRLF after chunk data".to_string(),
))));
}
match self.poll_read_more_sized(cx, 1) {
Poll::Ready(Ok(_n)) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending,
}
}
AsyncChunkedState::Trailers => {
let remaining = self.buffer_remaining();
if remaining.len() >= 2 && &remaining[..2] == b"\r\n" {
self.consume(2);
self.state = AsyncChunkedState::Complete;
return Poll::Ready(None);
}
const MAX_TRAILER_LINE: usize = 8 * 1024;
if remaining.len() > MAX_TRAILER_LINE {
self.state = AsyncChunkedState::Error;
return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
"trailer line too long".to_string(),
))));
}
if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
self.consume(crlf_pos + 2);
continue;
}
match self.poll_read_more_sized(cx, 1) {
Poll::Ready(Ok(_n)) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => return Poll::Pending,
}
}
AsyncChunkedState::Complete | AsyncChunkedState::Error => {
return Poll::Ready(None);
}
}
}
}
}
pub fn create_content_length_stream<R>(
initial_buffer: Vec<u8>,
reader: R,
content_length: usize,
config: &StreamingBodyConfig,
) -> fastapi_core::Body
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
let stream = AsyncContentLengthStream::new(initial_buffer, reader, content_length, config);
fastapi_core::Body::streaming_with_size(stream, content_length)
}
pub fn create_chunked_stream<R>(
initial_buffer: Vec<u8>,
reader: R,
config: &StreamingBodyConfig,
) -> fastapi_core::Body
where
R: AsyncRead + Unpin + Send + Sync + 'static,
{
let stream = AsyncChunkedStream::new(initial_buffer, reader, config);
fastapi_core::Body::streaming(stream)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn body_config_defaults() {
let config = BodyConfig::default();
assert_eq!(config.max_size(), DEFAULT_MAX_BODY_SIZE);
assert_eq!(config.initial_capacity(), 4096);
}
#[test]
fn body_config_custom() {
let config = BodyConfig::new()
.with_max_size(2048)
.with_initial_capacity(1024);
assert_eq!(config.max_size(), 2048);
assert_eq!(config.initial_capacity(), 1024);
}
#[test]
fn content_length_basic() {
let body = b"Hello, World!";
let config = BodyConfig::default();
let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
assert_eq!(reader.length(), 13);
assert_eq!(reader.remaining(), 13);
assert!(!reader.is_complete());
let result = reader.read_all().unwrap();
assert_eq!(result, b"Hello, World!");
assert!(reader.is_complete());
}
#[test]
fn content_length_zero() {
let body = b"";
let config = BodyConfig::default();
let mut reader = ContentLengthReader::new(body, 0, &config).unwrap();
assert_eq!(reader.length(), 0);
assert!(reader.is_complete());
let result = reader.read_all().unwrap();
assert!(result.is_empty());
}
#[test]
fn content_length_too_large() {
let body = b"small";
let config = BodyConfig::new().with_max_size(3);
let result = ContentLengthReader::new(body, 100, &config);
assert!(matches!(
result,
Err(BodyError::TooLarge { size: 100, max: 3 })
));
}
#[test]
fn content_length_incomplete() {
let body = b"Hello";
let config = BodyConfig::default();
let mut reader = ContentLengthReader::new(body, 10, &config).unwrap();
let result = reader.read_all();
assert!(matches!(
result,
Err(BodyError::Incomplete {
received: 5,
expected: Some(10)
})
));
}
#[test]
fn content_length_borrowed() {
let body = b"Hello, World!";
let config = BodyConfig::default();
let reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
let borrowed = reader.read_all_borrowed().unwrap();
assert_eq!(borrowed, body);
assert_eq!(borrowed.as_ptr(), body.as_ptr());
}
#[test]
fn content_length_incremental_read() {
let body = b"Hello, World!";
let config = BodyConfig::default();
let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
let mut buf = [0u8; 5];
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 5);
assert_eq!(&buf[..n], b"Hello");
assert_eq!(reader.remaining(), 8);
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 5);
assert_eq!(&buf[..n], b", Wor");
assert_eq!(reader.remaining(), 3);
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 3);
assert_eq!(&buf[..n], b"ld!");
assert!(reader.is_complete());
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 0);
}
#[test]
fn chunked_single_chunk() {
let body = b"5\r\nHello\r\n0\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all().unwrap();
assert_eq!(result, b"Hello");
assert!(reader.is_complete());
}
#[test]
fn chunked_multiple_chunks() {
let body = b"5\r\nHello\r\n7\r\n, World\r\n1\r\n!\r\n0\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all().unwrap();
assert_eq!(result, b"Hello, World!");
assert!(reader.is_complete());
}
#[test]
fn chunked_empty() {
let body = b"0\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all().unwrap();
assert!(result.is_empty());
assert!(reader.is_complete());
}
#[test]
fn chunked_with_extension() {
let body = b"5;ext=value\r\nHello\r\n0\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all().unwrap();
assert_eq!(result, b"Hello");
}
#[test]
fn chunked_with_trailers() {
let body = b"5\r\nHello\r\n0\r\nTrailer: value\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all().unwrap();
assert_eq!(result, b"Hello");
assert!(reader.is_complete());
}
#[test]
fn chunked_hex_sizes() {
let body = b"a\r\n0123456789\r\nF\r\n0123456789ABCDE\r\n0\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all().unwrap();
assert_eq!(result.len(), 10 + 15); }
#[test]
fn chunked_too_large() {
let body = b"10\r\n0123456789ABCDEF\r\n0\r\n\r\n"; let config = BodyConfig::new().with_max_size(10);
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all();
assert!(matches!(
result,
Err(BodyError::TooLarge { size: 16, max: 10 })
));
}
#[test]
fn chunked_invalid_size() {
let body = b"xyz\r\nHello\r\n0\r\n\r\n";
let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all();
assert!(matches!(
result,
Err(BodyError::InvalidChunkedEncoding { detail: _ })
));
}
#[test]
fn chunked_missing_crlf() {
let body = b"5\r\nHelloX0\r\n\r\n"; let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all();
assert!(matches!(
result,
Err(BodyError::InvalidChunkedEncoding {
detail: "expected CRLF after chunk data"
})
));
}
#[test]
fn chunked_incomplete() {
let body = b"5\r\nHel"; let config = BodyConfig::default();
let mut reader = ChunkedReader::new(body, &config);
let result = reader.decode_all();
assert!(matches!(result, Err(BodyError::Incomplete { .. })));
}
#[test]
fn parse_body_none() {
let config = BodyConfig::default();
let result = parse_body(b"ignored", BodyLength::None, &config).unwrap();
assert!(result.is_none());
}
#[test]
fn parse_body_content_length() {
let config = BodyConfig::default();
let result = parse_body(b"Hello, World!", BodyLength::ContentLength(13), &config).unwrap();
assert_eq!(result.unwrap(), b"Hello, World!");
}
#[test]
fn parse_body_content_length_zero() {
let config = BodyConfig::default();
let result = parse_body(b"", BodyLength::ContentLength(0), &config).unwrap();
assert_eq!(result.unwrap(), b"");
}
#[test]
fn parse_body_chunked() {
let config = BodyConfig::default();
let result = parse_body(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config).unwrap();
assert_eq!(result.unwrap(), b"Hello");
}
#[test]
fn parse_body_with_consumed_content_length() {
let config = BodyConfig::default();
let (body, consumed) =
parse_body_with_consumed(b"Hello, World!", BodyLength::ContentLength(13), &config)
.unwrap();
assert_eq!(body.unwrap(), b"Hello, World!");
assert_eq!(consumed, 13);
}
#[test]
fn parse_body_with_consumed_chunked() {
let config = BodyConfig::default();
let (body, consumed) =
parse_body_with_consumed(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config)
.unwrap();
assert_eq!(body.unwrap(), b"Hello");
assert_eq!(consumed, 15);
}
#[test]
fn validate_content_length_ok() {
let config = BodyConfig::new().with_max_size(1000);
assert!(validate_content_length(500, &config).is_ok());
assert!(validate_content_length(1000, &config).is_ok());
}
#[test]
fn validate_content_length_too_large() {
let config = BodyConfig::new().with_max_size(1000);
let result = validate_content_length(1001, &config);
assert!(matches!(
result,
Err(BodyError::TooLarge {
size: 1001,
max: 1000
})
));
}
#[test]
fn body_error_display() {
let err = BodyError::TooLarge {
size: 2000,
max: 1000,
};
assert_eq!(
format!("{err}"),
"body too large: 2000 bytes exceeds limit of 1000"
);
let err = BodyError::InvalidChunkedEncoding {
detail: "bad format",
};
assert_eq!(format!("{err}"), "invalid chunked encoding: bad format");
let err = BodyError::Incomplete {
received: 50,
expected: Some(100),
};
assert_eq!(
format!("{err}"),
"incomplete body: received 50 of 100 bytes"
);
let err = BodyError::UnexpectedEof;
assert_eq!(format!("{err}"), "unexpected end of body");
}
#[test]
fn streaming_body_config_defaults() {
let config = StreamingBodyConfig::default();
assert_eq!(config.streaming_threshold, DEFAULT_STREAMING_THRESHOLD);
assert_eq!(config.chunk_size, 8 * 1024);
assert_eq!(config.max_size, DEFAULT_MAX_BODY_SIZE);
}
#[test]
fn streaming_body_config_custom() {
let config = StreamingBodyConfig::new()
.with_streaming_threshold(1024)
.with_chunk_size(4096)
.with_max_size(10_000);
assert_eq!(config.streaming_threshold, 1024);
assert_eq!(config.chunk_size, 4096);
assert_eq!(config.max_size, 10_000);
}
#[test]
fn streaming_body_config_minimum_chunk_size() {
let config = StreamingBodyConfig::new().with_chunk_size(0);
assert_eq!(config.chunk_size, 1);
}
#[test]
fn streaming_body_config_should_stream() {
let config = StreamingBodyConfig::new().with_streaming_threshold(1000);
assert!(!config.should_stream(500));
assert!(!config.should_stream(1000));
assert!(config.should_stream(1001));
assert!(config.should_stream(10000));
}
#[test]
fn async_content_length_stream_from_buffer() {
use std::sync::Arc;
use std::task::{Wake, Waker};
struct NoopWaker;
impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
}
fn noop_waker() -> Waker {
Waker::from(Arc::new(NoopWaker))
}
struct EmptyReader;
impl AsyncRead for EmptyReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut asupersync::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
let buffer = b"Hello, World!".to_vec();
let config = StreamingBodyConfig::new().with_chunk_size(5);
let mut stream = AsyncContentLengthStream::new(buffer, EmptyReader, 13, &config);
assert_eq!(stream.expected_size(), 13);
assert_eq!(stream.bytes_read(), 0);
assert_eq!(stream.remaining(), 13);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Ok(chunk))) => {
assert_eq!(chunk, b"Hello");
}
_ => panic!("expected chunk"),
}
assert_eq!(stream.bytes_read(), 5);
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Ok(chunk))) => {
assert_eq!(chunk, b", Wor");
}
_ => panic!("expected chunk"),
}
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Ok(chunk))) => {
assert_eq!(chunk, b"ld!");
}
_ => panic!("expected chunk"),
}
let result = Pin::new(&mut stream).poll_next(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
assert!(stream.is_complete());
}
#[test]
fn async_content_length_stream_enforces_max_size() {
use std::io::Cursor;
use std::sync::Arc;
use std::task::{Wake, Waker};
struct NoopWaker;
impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
}
fn noop_waker() -> Waker {
Waker::from(Arc::new(NoopWaker))
}
let initial = b"123456".to_vec();
let reader = Cursor::new(b"abcdef".to_vec());
let config = StreamingBodyConfig::new()
.with_chunk_size(8)
.with_max_size(10);
let mut stream = AsyncContentLengthStream::new(initial, reader, 12, &config);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, b"123456"),
_ => panic!("expected initial chunk"),
}
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, b"abcd"),
_ => panic!("expected bounded reader chunk"),
}
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge { received, max }))) => {
assert_eq!(received, 11);
assert_eq!(max, 10);
}
_ => panic!("expected TooLarge error, got {:?}", result),
}
}
#[test]
fn async_chunked_stream_simple() {
use std::sync::Arc;
use std::task::{Wake, Waker};
struct NoopWaker;
impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
}
fn noop_waker() -> Waker {
Waker::from(Arc::new(NoopWaker))
}
struct EmptyReader;
impl AsyncRead for EmptyReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut asupersync::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
let buffer = b"5\r\nHello\r\n0\r\n\r\n".to_vec();
let config = StreamingBodyConfig::new().with_chunk_size(1024);
let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut stream).poll_next(&mut cx);
match result {
Poll::Ready(Some(Ok(chunk))) => {
assert_eq!(chunk, b"Hello");
}
_ => panic!("expected chunk, got {:?}", result),
}
let result = Pin::new(&mut stream).poll_next(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
assert!(stream.is_complete());
}
#[test]
fn async_chunked_stream_multiple_chunks() {
use std::sync::Arc;
use std::task::{Wake, Waker};
struct NoopWaker;
impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
}
fn noop_waker() -> Waker {
Waker::from(Arc::new(NoopWaker))
}
struct EmptyReader;
impl AsyncRead for EmptyReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut asupersync::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
let buffer = b"5\r\nHello\r\n8\r\n, World!\r\n0\r\n\r\n".to_vec();
let config = StreamingBodyConfig::new();
let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let mut collected = Vec::new();
loop {
match Pin::new(&mut stream).poll_next(&mut cx) {
Poll::Ready(Some(Ok(chunk))) => collected.extend_from_slice(&chunk),
Poll::Ready(Some(Err(e))) => panic!("unexpected error: {e}"),
Poll::Ready(None) => break,
Poll::Pending => {} }
}
assert_eq!(collected, b"Hello, World!");
}
}