Skip to main content

fastapi_http/
body.rs

1//! HTTP request body handling.
2//!
3//! This module provides body reading with support for:
4//! - Content-Length based reading with size limits
5//! - Chunked transfer encoding parsing
6//! - Streaming API for large bodies
7//! - Integration points for async I/O
8//!
9//! # Body Size Limits
10//!
11//! By default, bodies are limited to 1MB. This prevents denial-of-service
12//! attacks via large payloads. The limit is configurable per-request.
13//!
14//! # Streaming
15//!
16//! Large bodies can be read incrementally via the body reader API,
17//! which supports async I/O integration with checkpoints.
18//!
19//! # Example
20//!
21//! ```ignore
22//! use fastapi_http::body::{BodyConfig, ContentLengthReader};
23//!
24//! let config = BodyConfig::default().with_max_size(1024 * 1024);
25//! let mut reader = ContentLengthReader::new(body_bytes, 100, &config)?;
26//!
27//! let body = reader.read_all()?;
28//! ```
29
30use crate::parser::{BodyLength, ParseError};
31
32/// Default maximum body size (1MB).
33pub const DEFAULT_MAX_BODY_SIZE: usize = 1024 * 1024;
34
35/// Configuration for body reading.
36#[derive(Debug, Clone)]
37pub struct BodyConfig {
38    /// Maximum body size in bytes.
39    max_size: usize,
40    /// Initial buffer capacity for streaming.
41    initial_capacity: usize,
42}
43
44impl Default for BodyConfig {
45    fn default() -> Self {
46        Self {
47            max_size: DEFAULT_MAX_BODY_SIZE,
48            initial_capacity: 4096,
49        }
50    }
51}
52
53impl BodyConfig {
54    /// Create a new body configuration with default settings.
55    #[must_use]
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    /// Set the maximum body size.
61    #[must_use]
62    pub fn with_max_size(mut self, size: usize) -> Self {
63        self.max_size = size;
64        self
65    }
66
67    /// Set the initial buffer capacity.
68    #[must_use]
69    pub fn with_initial_capacity(mut self, capacity: usize) -> Self {
70        self.initial_capacity = capacity;
71        self
72    }
73
74    /// Returns the maximum body size.
75    #[must_use]
76    pub fn max_size(&self) -> usize {
77        self.max_size
78    }
79
80    /// Returns the initial buffer capacity.
81    #[must_use]
82    pub fn initial_capacity(&self) -> usize {
83        self.initial_capacity
84    }
85}
86
87/// Error types for body reading.
88#[derive(Debug)]
89pub enum BodyError {
90    /// Body exceeds maximum allowed size.
91    TooLarge {
92        /// The declared or actual size.
93        size: usize,
94        /// The maximum allowed size.
95        max: usize,
96    },
97    /// Invalid chunked encoding.
98    InvalidChunkedEncoding {
99        /// Description of the error.
100        detail: &'static str,
101    },
102    /// Incomplete body (need more data).
103    Incomplete {
104        /// Bytes received so far.
105        received: usize,
106        /// Expected total size (if known).
107        expected: Option<usize>,
108    },
109    /// Unexpected end of input.
110    UnexpectedEof,
111    /// Parse error from underlying parser.
112    Parse(ParseError),
113}
114
115impl std::fmt::Display for BodyError {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            Self::TooLarge { size, max } => {
119                write!(f, "body too large: {size} bytes exceeds limit of {max}")
120            }
121            Self::InvalidChunkedEncoding { detail } => {
122                write!(f, "invalid chunked encoding: {detail}")
123            }
124            Self::Incomplete { received, expected } => {
125                if let Some(exp) = expected {
126                    write!(f, "incomplete body: received {received} of {exp} bytes")
127                } else {
128                    write!(f, "incomplete body: received {received} bytes")
129                }
130            }
131            Self::UnexpectedEof => write!(f, "unexpected end of body"),
132            Self::Parse(e) => write!(f, "parse error: {e}"),
133        }
134    }
135}
136
137impl std::error::Error for BodyError {
138    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
139        match self {
140            Self::Parse(e) => Some(e),
141            _ => None,
142        }
143    }
144}
145
146impl From<ParseError> for BodyError {
147    fn from(e: ParseError) -> Self {
148        Self::Parse(e)
149    }
150}
151
152// ============================================================================
153// Content-Length Body Reading
154// ============================================================================
155
156/// Reads a body with a known Content-Length.
157///
158/// This reader validates that exactly `length` bytes are provided and
159/// enforces the configured size limit.
160#[derive(Debug)]
161pub struct ContentLengthReader<'a> {
162    buffer: &'a [u8],
163    length: usize,
164    position: usize,
165    // Stored for potential future use (streaming chunk size configuration)
166    #[allow(dead_code)]
167    config: BodyConfig,
168}
169
170impl<'a> ContentLengthReader<'a> {
171    /// Create a new Content-Length reader.
172    ///
173    /// # Arguments
174    ///
175    /// * `buffer` - The buffer containing body bytes
176    /// * `length` - The Content-Length value
177    /// * `config` - Body reading configuration
178    ///
179    /// # Errors
180    ///
181    /// Returns `BodyError::TooLarge` if `length` exceeds the configured maximum.
182    pub fn new(buffer: &'a [u8], length: usize, config: &BodyConfig) -> Result<Self, BodyError> {
183        // Check size limit before reading
184        if length > config.max_size {
185            return Err(BodyError::TooLarge {
186                size: length,
187                max: config.max_size,
188            });
189        }
190
191        Ok(Self {
192            buffer,
193            length,
194            position: 0,
195            config: config.clone(),
196        })
197    }
198
199    /// Returns the expected body length.
200    #[must_use]
201    pub fn length(&self) -> usize {
202        self.length
203    }
204
205    /// Returns the number of bytes remaining.
206    #[must_use]
207    pub fn remaining(&self) -> usize {
208        self.length.saturating_sub(self.position)
209    }
210
211    /// Returns true if all bytes have been read.
212    #[must_use]
213    pub fn is_complete(&self) -> bool {
214        self.position >= self.length
215    }
216
217    /// Read up to `max_bytes` into the provided buffer.
218    ///
219    /// Returns the number of bytes read.
220    pub fn read(&mut self, dest: &mut [u8]) -> Result<usize, BodyError> {
221        if self.is_complete() {
222            return Ok(0);
223        }
224
225        let available = self.buffer.len().saturating_sub(self.position);
226        let to_read = dest.len().min(self.remaining()).min(available);
227
228        if to_read == 0 && !self.is_complete() {
229            return Err(BodyError::Incomplete {
230                received: self.position,
231                expected: Some(self.length),
232            });
233        }
234
235        dest[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
236        self.position += to_read;
237
238        Ok(to_read)
239    }
240
241    /// Read all remaining body bytes.
242    ///
243    /// # Errors
244    ///
245    /// Returns `BodyError::Incomplete` if the buffer doesn't contain enough data.
246    pub fn read_all(&mut self) -> Result<Vec<u8>, BodyError> {
247        if self.buffer.len() < self.length {
248            return Err(BodyError::Incomplete {
249                received: self.buffer.len(),
250                expected: Some(self.length),
251            });
252        }
253
254        let body = self.buffer[..self.length].to_vec();
255        self.position = self.length;
256        Ok(body)
257    }
258
259    /// Read all remaining body bytes as a borrowed slice.
260    ///
261    /// This is zero-copy when the entire body is already in the buffer.
262    ///
263    /// # Errors
264    ///
265    /// Returns `BodyError::Incomplete` if the buffer doesn't contain enough data.
266    pub fn read_all_borrowed(&self) -> Result<&'a [u8], BodyError> {
267        if self.buffer.len() < self.length {
268            return Err(BodyError::Incomplete {
269                received: self.buffer.len(),
270                expected: Some(self.length),
271            });
272        }
273
274        Ok(&self.buffer[..self.length])
275    }
276}
277
278// ============================================================================
279// Chunked Transfer Encoding
280// ============================================================================
281
282/// State machine for chunked encoding parsing.
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284enum ChunkedState {
285    /// Expecting chunk size line.
286    ChunkSize,
287    /// Reading chunk data.
288    ChunkData { remaining: usize },
289    /// Expecting CRLF after chunk data.
290    ChunkDataEnd,
291    /// Reading trailers (after final chunk).
292    Trailers,
293    /// Complete.
294    Complete,
295}
296
297/// Parses chunked transfer encoding.
298///
299/// Chunked encoding format:
300/// ```text
301/// chunk-size CRLF
302/// chunk-data CRLF
303/// ...
304/// 0 CRLF
305/// [trailers] CRLF
306/// ```
307#[derive(Debug)]
308pub struct ChunkedReader<'a> {
309    buffer: &'a [u8],
310    position: usize,
311    state: ChunkedState,
312    total_size: usize,
313    config: BodyConfig,
314}
315
316impl<'a> ChunkedReader<'a> {
317    /// Create a new chunked reader.
318    ///
319    /// # Arguments
320    ///
321    /// * `buffer` - The buffer containing chunked body data
322    /// * `config` - Body reading configuration
323    #[must_use]
324    pub fn new(buffer: &'a [u8], config: &BodyConfig) -> Self {
325        Self {
326            buffer,
327            position: 0,
328            state: ChunkedState::ChunkSize,
329            total_size: 0,
330            config: config.clone(),
331        }
332    }
333
334    /// Returns true if parsing is complete.
335    #[must_use]
336    pub fn is_complete(&self) -> bool {
337        self.state == ChunkedState::Complete
338    }
339
340    /// Returns the total decoded body size so far.
341    #[must_use]
342    pub fn total_size(&self) -> usize {
343        self.total_size
344    }
345
346    /// Parse the chunk size line and return the size.
347    fn parse_chunk_size(&self) -> Result<(usize, usize), BodyError> {
348        let remaining = &self.buffer[self.position..];
349
350        // Find CRLF
351        let line_end =
352            remaining
353                .windows(2)
354                .position(|w| w == b"\r\n")
355                .ok_or(BodyError::Incomplete {
356                    received: self.position,
357                    expected: None,
358                })?;
359
360        let size_line = &remaining[..line_end];
361
362        // Parse hex size (ignore chunk extensions after semicolon)
363        let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';') {
364            &size_line[..semi]
365        } else {
366            size_line
367        };
368
369        let size_str =
370            std::str::from_utf8(size_str).map_err(|_| BodyError::InvalidChunkedEncoding {
371                detail: "invalid UTF-8 in chunk size",
372            })?;
373
374        let size = usize::from_str_radix(size_str.trim(), 16).map_err(|_| {
375            BodyError::InvalidChunkedEncoding {
376                detail: "invalid hex chunk size",
377            }
378        })?;
379
380        // Reject unreasonably large chunk sizes early to prevent DoS.
381        // Individual chunks over 16MB are almost certainly attacks.
382        const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
383        if size > MAX_SINGLE_CHUNK {
384            return Err(BodyError::InvalidChunkedEncoding {
385                detail: "chunk size exceeds 16MB limit",
386            });
387        }
388
389        // bytes_consumed = size_line + CRLF
390        Ok((size, line_end + 2))
391    }
392
393    /// Decode all chunks into a single buffer.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if:
398    /// - The total size exceeds the configured limit
399    /// - The chunked encoding is malformed
400    /// - The buffer is incomplete
401    pub fn decode_all(&mut self) -> Result<Vec<u8>, BodyError> {
402        let mut output = Vec::with_capacity(self.config.initial_capacity);
403
404        loop {
405            match self.state {
406                ChunkedState::ChunkSize => {
407                    let (size, consumed) = self.parse_chunk_size()?;
408                    self.position += consumed;
409
410                    // Check size limit
411                    let new_total = self.total_size.saturating_add(size);
412                    if new_total > self.config.max_size {
413                        return Err(BodyError::TooLarge {
414                            size: new_total,
415                            max: self.config.max_size,
416                        });
417                    }
418
419                    if size == 0 {
420                        // Final chunk - transition to trailers
421                        self.state = ChunkedState::Trailers;
422                    } else {
423                        self.state = ChunkedState::ChunkData { remaining: size };
424                    }
425                }
426                ChunkedState::ChunkData { remaining } => {
427                    let available = self.buffer.len().saturating_sub(self.position);
428                    if available < remaining {
429                        return Err(BodyError::Incomplete {
430                            received: self.total_size + (remaining - available),
431                            expected: None,
432                        });
433                    }
434
435                    // Copy chunk data
436                    let chunk_data = &self.buffer[self.position..self.position + remaining];
437                    output.extend_from_slice(chunk_data);
438                    self.position += remaining;
439                    self.total_size += remaining;
440
441                    self.state = ChunkedState::ChunkDataEnd;
442                }
443                ChunkedState::ChunkDataEnd => {
444                    // Expect CRLF
445                    let remaining = &self.buffer[self.position..];
446                    if remaining.len() < 2 {
447                        return Err(BodyError::Incomplete {
448                            received: self.total_size,
449                            expected: None,
450                        });
451                    }
452
453                    if &remaining[..2] != b"\r\n" {
454                        return Err(BodyError::InvalidChunkedEncoding {
455                            detail: "expected CRLF after chunk data",
456                        });
457                    }
458
459                    self.position += 2;
460                    self.state = ChunkedState::ChunkSize;
461                }
462                ChunkedState::Trailers => {
463                    // Skip trailers until empty line
464                    let remaining = &self.buffer[self.position..];
465
466                    // Look for CRLF (empty line) or trailer headers
467                    if remaining.starts_with(b"\r\n") {
468                        self.position += 2;
469                        self.state = ChunkedState::Complete;
470                    } else {
471                        // Find end of trailer line
472                        let line_end = remaining.windows(2).position(|w| w == b"\r\n");
473                        match line_end {
474                            Some(pos) => {
475                                // Skip this trailer
476                                self.position += pos + 2;
477                                // Stay in Trailers state to handle more trailers or final CRLF
478                            }
479                            None => {
480                                return Err(BodyError::Incomplete {
481                                    received: self.total_size,
482                                    expected: None,
483                                });
484                            }
485                        }
486                    }
487                }
488                ChunkedState::Complete => {
489                    break;
490                }
491            }
492        }
493
494        Ok(output)
495    }
496
497    /// Returns the number of bytes consumed from the raw buffer.
498    #[must_use]
499    pub fn bytes_consumed(&self) -> usize {
500        self.position
501    }
502}
503
504// ============================================================================
505// Body Parsing from Headers
506// ============================================================================
507
508/// Parse a request body from a buffer given the body length indicator.
509///
510/// This is the main entry point for body parsing. It dispatches to the
511/// appropriate reader based on Content-Length or Transfer-Encoding.
512///
513/// # Arguments
514///
515/// * `buffer` - The buffer containing the body (after headers)
516/// * `body_length` - The body length indicator from header parsing
517/// * `config` - Body reading configuration
518///
519/// # Returns
520///
521/// Returns the parsed body bytes, or `None` if no body is expected.
522///
523/// # Errors
524///
525/// Returns an error if:
526/// - The body exceeds the configured size limit
527/// - The chunked encoding is malformed
528/// - The buffer is incomplete
529pub fn parse_body(
530    buffer: &[u8],
531    body_length: BodyLength,
532    config: &BodyConfig,
533) -> Result<Option<Vec<u8>>, BodyError> {
534    let (body, _) = parse_body_with_consumed(buffer, body_length, config)?;
535    Ok(body)
536}
537
538/// Parse a request body and return both the decoded body and bytes consumed.
539///
540/// This is useful for incremental parsing to determine request boundaries.
541///
542/// # Errors
543///
544/// Returns an error if:
545/// - The body exceeds the configured size limit
546/// - The chunked encoding is malformed
547/// - The buffer is incomplete
548pub fn parse_body_with_consumed(
549    buffer: &[u8],
550    body_length: BodyLength,
551    config: &BodyConfig,
552) -> Result<(Option<Vec<u8>>, usize), BodyError> {
553    match body_length {
554        BodyLength::None => Ok((None, 0)),
555        BodyLength::ContentLength(len) => {
556            if len == 0 {
557                return Ok((Some(Vec::new()), 0));
558            }
559            let mut reader = ContentLengthReader::new(buffer, len, config)?;
560            let body = reader.read_all()?;
561            Ok((Some(body), len))
562        }
563        BodyLength::Chunked => {
564            let mut reader = ChunkedReader::new(buffer, config);
565            let body = reader.decode_all()?;
566            Ok((Some(body), reader.bytes_consumed()))
567        }
568        BodyLength::Conflicting => Err(BodyError::InvalidChunkedEncoding {
569            detail: "conflicting body length indicators",
570        }),
571    }
572}
573
574/// Validates body size against Content-Length header before reading.
575///
576/// Call this early (before buffering) to reject oversized requests.
577///
578/// # Arguments
579///
580/// * `content_length` - The Content-Length header value
581/// * `config` - Body reading configuration
582///
583/// # Errors
584///
585/// Returns `BodyError::TooLarge` if the content length exceeds the limit.
586pub fn validate_content_length(
587    content_length: usize,
588    config: &BodyConfig,
589) -> Result<(), BodyError> {
590    if content_length > config.max_size {
591        return Err(BodyError::TooLarge {
592            size: content_length,
593            max: config.max_size,
594        });
595    }
596    Ok(())
597}
598
599// ============================================================================
600// Async Streaming Body Readers
601// ============================================================================
602
603use asupersync::io::AsyncRead;
604use asupersync::stream::Stream;
605use fastapi_core::RequestBodyStreamError;
606use std::pin::Pin;
607use std::task::{Context, Poll};
608
609/// Default threshold for enabling streaming (64KB).
610///
611/// Bodies larger than this will be streamed rather than buffered entirely.
612pub const DEFAULT_STREAMING_THRESHOLD: usize = 64 * 1024;
613
614/// Configuration for async body streaming.
615#[derive(Debug, Clone)]
616pub struct StreamingBodyConfig {
617    /// Threshold above which bodies are streamed.
618    pub streaming_threshold: usize,
619    /// Size of each read chunk.
620    pub chunk_size: usize,
621    /// Maximum body size (enforced during streaming).
622    pub max_size: usize,
623}
624
625impl Default for StreamingBodyConfig {
626    fn default() -> Self {
627        Self {
628            streaming_threshold: DEFAULT_STREAMING_THRESHOLD,
629            chunk_size: 8 * 1024, // 8KB chunks
630            max_size: DEFAULT_MAX_BODY_SIZE,
631        }
632    }
633}
634
635impl StreamingBodyConfig {
636    /// Create a new streaming config with default settings.
637    #[must_use]
638    pub fn new() -> Self {
639        Self::default()
640    }
641
642    /// Set the streaming threshold.
643    #[must_use]
644    pub fn with_streaming_threshold(mut self, threshold: usize) -> Self {
645        self.streaming_threshold = threshold;
646        self
647    }
648
649    /// Set the chunk size for reads.
650    ///
651    /// Note: For network efficiency, values below 1KB are allowed but not recommended
652    /// for production use.
653    #[must_use]
654    pub fn with_chunk_size(mut self, size: usize) -> Self {
655        self.chunk_size = size.max(1); // Minimum 1 byte (for testing)
656        self
657    }
658
659    /// Set the maximum body size.
660    #[must_use]
661    pub fn with_max_size(mut self, size: usize) -> Self {
662        self.max_size = size;
663        self
664    }
665
666    /// Returns true if the given content length should be streamed.
667    #[must_use]
668    pub fn should_stream(&self, content_length: usize) -> bool {
669        content_length > self.streaming_threshold
670    }
671}
672
673/// An async stream that reads a Content-Length body in chunks.
674///
675/// This stream first yields any buffered data from the parser, then
676/// continues reading from the underlying async reader until the
677/// expected length is reached.
678///
679/// # Memory Efficiency
680///
681/// Only one chunk is buffered at a time, making this suitable for
682/// streaming large request bodies without excessive memory usage.
683pub struct AsyncContentLengthStream<R> {
684    /// Optional reader for more data (None after initial buffer exhausted and no reader).
685    reader: Option<R>,
686    /// Initial buffer from parser.
687    initial_buffer: Vec<u8>,
688    /// Position in initial buffer.
689    initial_position: usize,
690    /// Expected total size from Content-Length.
691    expected_size: usize,
692    /// Bytes read so far.
693    bytes_read: usize,
694    /// Chunk size for reads.
695    chunk_size: usize,
696    /// Maximum allowed size.
697    max_size: usize,
698    /// Read buffer (reused across reads).
699    read_buffer: Vec<u8>,
700    /// Whether the stream is complete.
701    complete: bool,
702    /// Whether an error occurred.
703    error: bool,
704}
705
706impl<R> AsyncContentLengthStream<R>
707where
708    R: AsyncRead + Unpin + Send + Sync + 'static,
709{
710    /// Create a new Content-Length stream.
711    ///
712    /// # Arguments
713    ///
714    /// * `initial_buffer` - Any bytes already buffered by the parser
715    /// * `reader` - The async reader for remaining bytes
716    /// * `content_length` - Expected total body size
717    /// * `config` - Streaming configuration
718    pub fn new(
719        initial_buffer: Vec<u8>,
720        reader: R,
721        content_length: usize,
722        config: &StreamingBodyConfig,
723    ) -> Self {
724        Self {
725            reader: Some(reader),
726            initial_buffer,
727            initial_position: 0,
728            expected_size: content_length,
729            bytes_read: 0,
730            chunk_size: config.chunk_size,
731            max_size: config.max_size,
732            read_buffer: vec![0u8; config.chunk_size],
733            complete: false,
734            error: false,
735        }
736    }
737
738    /// Create a Content-Length stream with default config.
739    pub fn with_defaults(initial_buffer: Vec<u8>, reader: R, content_length: usize) -> Self {
740        Self::new(
741            initial_buffer,
742            reader,
743            content_length,
744            &StreamingBodyConfig::default(),
745        )
746    }
747
748    /// Returns the expected total size.
749    #[must_use]
750    pub fn expected_size(&self) -> usize {
751        self.expected_size
752    }
753
754    /// Returns the number of bytes read so far.
755    #[must_use]
756    pub fn bytes_read(&self) -> usize {
757        self.bytes_read
758    }
759
760    /// Returns the remaining bytes to read.
761    #[must_use]
762    pub fn remaining(&self) -> usize {
763        self.expected_size.saturating_sub(self.bytes_read)
764    }
765
766    /// Returns true if the stream is complete.
767    #[must_use]
768    pub fn is_complete(&self) -> bool {
769        self.complete
770    }
771
772    fn initial_remaining(&self) -> usize {
773        self.initial_buffer
774            .len()
775            .saturating_sub(self.initial_position)
776    }
777}
778
779impl<R> Stream for AsyncContentLengthStream<R>
780where
781    R: AsyncRead + Unpin + Send + Sync + 'static,
782{
783    type Item = Result<Vec<u8>, RequestBodyStreamError>;
784
785    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786        // Check if complete or error
787        if self.complete || self.error {
788            return Poll::Ready(None);
789        }
790
791        // Check if we've read all expected bytes
792        if self.bytes_read >= self.expected_size {
793            self.complete = true;
794            return Poll::Ready(None);
795        }
796
797        // Check size limit
798        if self.bytes_read > self.max_size {
799            self.error = true;
800            let bytes_read = self.bytes_read;
801            let max_size = self.max_size;
802            return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
803                received: bytes_read,
804                max: max_size,
805            })));
806        }
807
808        // First, try to yield from initial buffer
809        let initial_remaining = self.initial_remaining();
810        if initial_remaining > 0 {
811            let remaining_for_body = self.expected_size.saturating_sub(self.bytes_read);
812            let chunk_size = self
813                .chunk_size
814                .min(initial_remaining)
815                .min(remaining_for_body);
816
817            if chunk_size > 0 {
818                let start = self.initial_position;
819                let chunk = self.initial_buffer[start..start + chunk_size].to_vec();
820                self.initial_position += chunk_size;
821                self.bytes_read += chunk_size;
822                return Poll::Ready(Some(Ok(chunk)));
823            }
824        }
825
826        // Initial buffer exhausted, read from reader
827        let remaining = self.expected_size.saturating_sub(self.bytes_read);
828        let to_read = self.chunk_size.min(remaining);
829
830        if to_read == 0 {
831            self.complete = true;
832            return Poll::Ready(None);
833        }
834
835        // Ensure buffer is sized appropriately
836        if self.read_buffer.len() < to_read {
837            self.read_buffer.resize(to_read, 0);
838        }
839
840        // Take reader temporarily to avoid borrow conflicts
841        let mut reader = match self.reader.take() {
842            Some(r) => r,
843            None => {
844                self.error = true;
845                return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
846            }
847        };
848
849        // Perform the read and extract result before modifying self
850        let read_result = {
851            let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..to_read]);
852            match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
853                Poll::Ready(Ok(())) => {
854                    let n = read_buf.filled().len();
855                    let chunk = read_buf.filled().to_vec();
856                    Poll::Ready(Ok((n, chunk)))
857                }
858                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
859                Poll::Pending => Poll::Pending,
860            }
861        };
862
863        match read_result {
864            Poll::Ready(Ok((n, chunk))) => {
865                if n == 0 {
866                    // EOF before expected bytes - incomplete body
867                    self.error = true;
868                    return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
869                }
870
871                self.bytes_read += n;
872
873                // Put reader back
874                self.reader = Some(reader);
875
876                Poll::Ready(Some(Ok(chunk)))
877            }
878            Poll::Ready(Err(e)) => {
879                self.error = true;
880                Poll::Ready(Some(Err(RequestBodyStreamError::Io(e.to_string()))))
881            }
882            Poll::Pending => {
883                // Put reader back before returning Pending
884                self.reader = Some(reader);
885                Poll::Pending
886            }
887        }
888    }
889}
890
891/// Parsing state for chunked encoding.
892#[derive(Debug, Clone, Copy, PartialEq, Eq)]
893enum AsyncChunkedState {
894    /// Parsing chunk size line.
895    ChunkSize,
896    /// Reading chunk data.
897    ChunkData { remaining: usize },
898    /// Expecting CRLF after chunk data.
899    ChunkDataEnd,
900    /// Complete.
901    Complete,
902    /// Error.
903    Error,
904}
905
906/// An async stream that reads a chunked-encoded body.
907///
908/// This stream parses chunked transfer encoding on the fly,
909/// yielding decoded chunks as they become available.
910///
911/// # Chunked Encoding Format
912///
913/// ```text
914/// chunk-size CRLF
915/// chunk-data CRLF
916/// ...
917/// 0 CRLF
918/// [trailers] CRLF
919/// ```
920pub struct AsyncChunkedStream<R> {
921    /// Reader for more data (used when buffer is exhausted).
922    #[allow(dead_code)]
923    reader: Option<R>,
924    /// Parsing state.
925    state: AsyncChunkedState,
926    /// Total decoded bytes so far.
927    bytes_decoded: usize,
928    /// Maximum allowed size.
929    max_size: usize,
930    /// Chunk size for reads.
931    chunk_size: usize,
932    /// Read buffer (used when socket reads are needed).
933    #[allow(dead_code)]
934    read_buffer: Vec<u8>,
935    /// Buffer for initial data from parser + any data read from socket.
936    buffer: Vec<u8>,
937    /// Position in buffer.
938    position: usize,
939}
940
941impl<R> AsyncChunkedStream<R>
942where
943    R: AsyncRead + Unpin + Send + Sync + 'static,
944{
945    /// Create a new chunked stream.
946    ///
947    /// # Arguments
948    ///
949    /// * `initial_buffer` - Any bytes already buffered by the parser
950    /// * `reader` - The async reader for remaining bytes
951    /// * `config` - Streaming configuration
952    ///
953    /// # Panics
954    ///
955    /// Panics if `initial_buffer` exceeds `config.max_size`. Use `try_new` for
956    /// fallible construction.
957    pub fn new(initial_buffer: Vec<u8>, reader: R, config: &StreamingBodyConfig) -> Self {
958        assert!(
959            initial_buffer.len() <= config.max_size,
960            "initial buffer size {} exceeds max size {}",
961            initial_buffer.len(),
962            config.max_size
963        );
964        Self {
965            reader: Some(reader),
966            state: AsyncChunkedState::ChunkSize,
967            bytes_decoded: 0,
968            max_size: config.max_size,
969            chunk_size: config.chunk_size,
970            read_buffer: vec![0u8; config.chunk_size],
971            buffer: initial_buffer,
972            position: 0,
973        }
974    }
975
976    /// Try to create a new chunked stream, returning error if initial buffer is too large.
977    ///
978    /// # Errors
979    ///
980    /// Returns error if `initial_buffer.len()` exceeds `config.max_size`.
981    pub fn try_new(
982        initial_buffer: Vec<u8>,
983        reader: R,
984        config: &StreamingBodyConfig,
985    ) -> Result<Self, RequestBodyStreamError> {
986        if initial_buffer.len() > config.max_size {
987            return Err(RequestBodyStreamError::Io(format!(
988                "initial buffer size {} exceeds max size {}",
989                initial_buffer.len(),
990                config.max_size
991            )));
992        }
993        Ok(Self {
994            reader: Some(reader),
995            state: AsyncChunkedState::ChunkSize,
996            bytes_decoded: 0,
997            max_size: config.max_size,
998            chunk_size: config.chunk_size,
999            read_buffer: vec![0u8; config.chunk_size],
1000            buffer: initial_buffer,
1001            position: 0,
1002        })
1003    }
1004
1005    /// Create a chunked stream with default config.
1006    pub fn with_defaults(initial_buffer: Vec<u8>, reader: R) -> Self {
1007        Self::new(initial_buffer, reader, &StreamingBodyConfig::default())
1008    }
1009
1010    /// Returns the total decoded bytes so far.
1011    #[must_use]
1012    pub fn bytes_decoded(&self) -> usize {
1013        self.bytes_decoded
1014    }
1015
1016    /// Returns true if the stream is complete.
1017    #[must_use]
1018    pub fn is_complete(&self) -> bool {
1019        self.state == AsyncChunkedState::Complete
1020    }
1021
1022    /// Get remaining buffer bytes.
1023    fn buffer_remaining(&self) -> &[u8] {
1024        &self.buffer[self.position..]
1025    }
1026
1027    /// Consume bytes from buffer.
1028    fn consume(&mut self, n: usize) {
1029        self.position += n;
1030    }
1031}
1032
1033impl<R> Stream for AsyncChunkedStream<R>
1034where
1035    R: AsyncRead + Unpin + Send + Sync + 'static,
1036{
1037    type Item = Result<Vec<u8>, RequestBodyStreamError>;
1038
1039    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1040        // Check if complete or error
1041        if self.state == AsyncChunkedState::Complete || self.state == AsyncChunkedState::Error {
1042            return Poll::Ready(None);
1043        }
1044
1045        // Check size limit
1046        if self.bytes_decoded > self.max_size {
1047            self.state = AsyncChunkedState::Error;
1048            let bytes_decoded = self.bytes_decoded;
1049            let max_size = self.max_size;
1050            return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
1051                received: bytes_decoded,
1052                max: max_size,
1053            })));
1054        }
1055
1056        loop {
1057            match self.state {
1058                AsyncChunkedState::ChunkSize => {
1059                    // Try to find chunk size line in buffer
1060                    let remaining = self.buffer_remaining();
1061                    if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
1062                        // Parse chunk size
1063                        let size_line = &remaining[..crlf_pos];
1064
1065                        // Parse hex size (ignore extensions after semicolon)
1066                        let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';')
1067                        {
1068                            &size_line[..semi]
1069                        } else {
1070                            size_line
1071                        };
1072
1073                        let size_str = match std::str::from_utf8(size_str) {
1074                            Ok(s) => s.trim(),
1075                            Err(_) => {
1076                                self.state = AsyncChunkedState::Error;
1077                                return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1078                                    "invalid UTF-8 in chunk size".to_string(),
1079                                ))));
1080                            }
1081                        };
1082
1083                        let chunk_size = match usize::from_str_radix(size_str, 16) {
1084                            Ok(s) => s,
1085                            Err(_) => {
1086                                self.state = AsyncChunkedState::Error;
1087                                return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1088                                    "invalid hex chunk size".to_string(),
1089                                ))));
1090                            }
1091                        };
1092
1093                        // Reject unreasonably large chunk sizes early
1094                        const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
1095                        if chunk_size > MAX_SINGLE_CHUNK {
1096                            self.state = AsyncChunkedState::Error;
1097                            return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1098                                "chunk size exceeds 16MB limit".to_string(),
1099                            ))));
1100                        }
1101
1102                        self.consume(crlf_pos + 2);
1103
1104                        if chunk_size == 0 {
1105                            // Final chunk - complete
1106                            self.state = AsyncChunkedState::Complete;
1107                            return Poll::Ready(None);
1108                        }
1109
1110                        self.state = AsyncChunkedState::ChunkData {
1111                            remaining: chunk_size,
1112                        };
1113                        continue;
1114                    }
1115
1116                    // Need more data - for now just error (full impl would read from socket)
1117                    self.state = AsyncChunkedState::Error;
1118                    return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1119                        "incomplete chunk size line".to_string(),
1120                    ))));
1121                }
1122                AsyncChunkedState::ChunkData { remaining } => {
1123                    // Read chunk data from buffer
1124                    let buffer_remaining = self.buffer_remaining();
1125                    let to_read = remaining.min(buffer_remaining.len()).min(self.chunk_size);
1126
1127                    if to_read > 0 {
1128                        let chunk = buffer_remaining[..to_read].to_vec();
1129                        self.consume(to_read);
1130                        self.bytes_decoded += to_read;
1131
1132                        let new_remaining = remaining - to_read;
1133                        if new_remaining == 0 {
1134                            self.state = AsyncChunkedState::ChunkDataEnd;
1135                        } else {
1136                            self.state = AsyncChunkedState::ChunkData {
1137                                remaining: new_remaining,
1138                            };
1139                        }
1140
1141                        return Poll::Ready(Some(Ok(chunk)));
1142                    }
1143
1144                    // Need more data from socket
1145                    self.state = AsyncChunkedState::Error;
1146                    return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1147                        "incomplete chunk data".to_string(),
1148                    ))));
1149                }
1150                AsyncChunkedState::ChunkDataEnd => {
1151                    // Expect CRLF
1152                    let remaining = self.buffer_remaining();
1153                    if remaining.len() >= 2 {
1154                        if &remaining[..2] == b"\r\n" {
1155                            self.consume(2);
1156                            self.state = AsyncChunkedState::ChunkSize;
1157                            continue;
1158                        }
1159                        self.state = AsyncChunkedState::Error;
1160                        return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1161                            "expected CRLF after chunk data".to_string(),
1162                        ))));
1163                    }
1164
1165                    // Need more data
1166                    self.state = AsyncChunkedState::Error;
1167                    return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1168                        "incomplete CRLF after chunk".to_string(),
1169                    ))));
1170                }
1171                AsyncChunkedState::Complete | AsyncChunkedState::Error => {
1172                    return Poll::Ready(None);
1173                }
1174            }
1175        }
1176    }
1177}
1178
1179/// Create a streaming body from a Content-Length body.
1180///
1181/// Returns a `fastapi_core::Body::Stream` that yields chunks from the given
1182/// initial buffer and async reader.
1183///
1184/// # Arguments
1185///
1186/// * `initial_buffer` - Any bytes already buffered by the parser
1187/// * `reader` - The async reader for remaining bytes
1188/// * `content_length` - Expected total body size
1189/// * `config` - Streaming configuration
1190pub fn create_content_length_stream<R>(
1191    initial_buffer: Vec<u8>,
1192    reader: R,
1193    content_length: usize,
1194    config: &StreamingBodyConfig,
1195) -> fastapi_core::Body
1196where
1197    R: AsyncRead + Unpin + Send + Sync + 'static,
1198{
1199    let stream = AsyncContentLengthStream::new(initial_buffer, reader, content_length, config);
1200    fastapi_core::Body::streaming_with_size(stream, content_length)
1201}
1202
1203/// Create a streaming body from a chunked transfer-encoded body.
1204///
1205/// Returns a `fastapi_core::Body::Stream` that yields decoded chunks.
1206///
1207/// # Arguments
1208///
1209/// * `initial_buffer` - Any bytes already buffered by the parser
1210/// * `reader` - The async reader for remaining bytes
1211/// * `config` - Streaming configuration
1212pub fn create_chunked_stream<R>(
1213    initial_buffer: Vec<u8>,
1214    reader: R,
1215    config: &StreamingBodyConfig,
1216) -> fastapi_core::Body
1217where
1218    R: AsyncRead + Unpin + Send + Sync + 'static,
1219{
1220    let stream = AsyncChunkedStream::new(initial_buffer, reader, config);
1221    fastapi_core::Body::streaming(stream)
1222}
1223
1224// ============================================================================
1225// Tests
1226// ============================================================================
1227
1228#[cfg(test)]
1229mod tests {
1230    use super::*;
1231
1232    // ========================================================================
1233    // BodyConfig Tests
1234    // ========================================================================
1235
1236    #[test]
1237    fn body_config_defaults() {
1238        let config = BodyConfig::default();
1239        assert_eq!(config.max_size(), DEFAULT_MAX_BODY_SIZE);
1240        assert_eq!(config.initial_capacity(), 4096);
1241    }
1242
1243    #[test]
1244    fn body_config_custom() {
1245        let config = BodyConfig::new()
1246            .with_max_size(2048)
1247            .with_initial_capacity(1024);
1248        assert_eq!(config.max_size(), 2048);
1249        assert_eq!(config.initial_capacity(), 1024);
1250    }
1251
1252    // ========================================================================
1253    // Content-Length Reader Tests
1254    // ========================================================================
1255
1256    #[test]
1257    fn content_length_basic() {
1258        let body = b"Hello, World!";
1259        let config = BodyConfig::default();
1260        let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1261
1262        assert_eq!(reader.length(), 13);
1263        assert_eq!(reader.remaining(), 13);
1264        assert!(!reader.is_complete());
1265
1266        let result = reader.read_all().unwrap();
1267        assert_eq!(result, b"Hello, World!");
1268        assert!(reader.is_complete());
1269    }
1270
1271    #[test]
1272    fn content_length_zero() {
1273        let body = b"";
1274        let config = BodyConfig::default();
1275        let mut reader = ContentLengthReader::new(body, 0, &config).unwrap();
1276
1277        assert_eq!(reader.length(), 0);
1278        assert!(reader.is_complete());
1279
1280        let result = reader.read_all().unwrap();
1281        assert!(result.is_empty());
1282    }
1283
1284    #[test]
1285    fn content_length_too_large() {
1286        let body = b"small";
1287        let config = BodyConfig::new().with_max_size(3);
1288        let result = ContentLengthReader::new(body, 100, &config);
1289
1290        assert!(matches!(
1291            result,
1292            Err(BodyError::TooLarge { size: 100, max: 3 })
1293        ));
1294    }
1295
1296    #[test]
1297    fn content_length_incomplete() {
1298        let body = b"Hello";
1299        let config = BodyConfig::default();
1300        let mut reader = ContentLengthReader::new(body, 10, &config).unwrap();
1301
1302        let result = reader.read_all();
1303        assert!(matches!(
1304            result,
1305            Err(BodyError::Incomplete {
1306                received: 5,
1307                expected: Some(10)
1308            })
1309        ));
1310    }
1311
1312    #[test]
1313    fn content_length_borrowed() {
1314        let body = b"Hello, World!";
1315        let config = BodyConfig::default();
1316        let reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1317
1318        let borrowed = reader.read_all_borrowed().unwrap();
1319        assert_eq!(borrowed, body);
1320        // Verify it's the same memory location (zero-copy)
1321        assert_eq!(borrowed.as_ptr(), body.as_ptr());
1322    }
1323
1324    #[test]
1325    fn content_length_incremental_read() {
1326        let body = b"Hello, World!";
1327        let config = BodyConfig::default();
1328        let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1329
1330        let mut buf = [0u8; 5];
1331
1332        // First read
1333        let n = reader.read(&mut buf).unwrap();
1334        assert_eq!(n, 5);
1335        assert_eq!(&buf[..n], b"Hello");
1336        assert_eq!(reader.remaining(), 8);
1337
1338        // Second read
1339        let n = reader.read(&mut buf).unwrap();
1340        assert_eq!(n, 5);
1341        assert_eq!(&buf[..n], b", Wor");
1342        assert_eq!(reader.remaining(), 3);
1343
1344        // Third read
1345        let n = reader.read(&mut buf).unwrap();
1346        assert_eq!(n, 3);
1347        assert_eq!(&buf[..n], b"ld!");
1348        assert!(reader.is_complete());
1349
1350        // No more data
1351        let n = reader.read(&mut buf).unwrap();
1352        assert_eq!(n, 0);
1353    }
1354
1355    // ========================================================================
1356    // Chunked Encoding Tests
1357    // ========================================================================
1358
1359    #[test]
1360    fn chunked_single_chunk() {
1361        let body = b"5\r\nHello\r\n0\r\n\r\n";
1362        let config = BodyConfig::default();
1363        let mut reader = ChunkedReader::new(body, &config);
1364
1365        let result = reader.decode_all().unwrap();
1366        assert_eq!(result, b"Hello");
1367        assert!(reader.is_complete());
1368    }
1369
1370    #[test]
1371    fn chunked_multiple_chunks() {
1372        let body = b"5\r\nHello\r\n7\r\n, World\r\n1\r\n!\r\n0\r\n\r\n";
1373        let config = BodyConfig::default();
1374        let mut reader = ChunkedReader::new(body, &config);
1375
1376        let result = reader.decode_all().unwrap();
1377        assert_eq!(result, b"Hello, World!");
1378        assert!(reader.is_complete());
1379    }
1380
1381    #[test]
1382    fn chunked_empty() {
1383        let body = b"0\r\n\r\n";
1384        let config = BodyConfig::default();
1385        let mut reader = ChunkedReader::new(body, &config);
1386
1387        let result = reader.decode_all().unwrap();
1388        assert!(result.is_empty());
1389        assert!(reader.is_complete());
1390    }
1391
1392    #[test]
1393    fn chunked_with_extension() {
1394        // Chunk extensions should be ignored
1395        let body = b"5;ext=value\r\nHello\r\n0\r\n\r\n";
1396        let config = BodyConfig::default();
1397        let mut reader = ChunkedReader::new(body, &config);
1398
1399        let result = reader.decode_all().unwrap();
1400        assert_eq!(result, b"Hello");
1401    }
1402
1403    #[test]
1404    fn chunked_with_trailers() {
1405        let body = b"5\r\nHello\r\n0\r\nTrailer: value\r\n\r\n";
1406        let config = BodyConfig::default();
1407        let mut reader = ChunkedReader::new(body, &config);
1408
1409        let result = reader.decode_all().unwrap();
1410        assert_eq!(result, b"Hello");
1411        assert!(reader.is_complete());
1412    }
1413
1414    #[test]
1415    fn chunked_hex_sizes() {
1416        // Test various hex chunk sizes
1417        let body = b"a\r\n0123456789\r\nF\r\n0123456789ABCDE\r\n0\r\n\r\n";
1418        let config = BodyConfig::default();
1419        let mut reader = ChunkedReader::new(body, &config);
1420
1421        let result = reader.decode_all().unwrap();
1422        assert_eq!(result.len(), 10 + 15); // a=10, F=15
1423    }
1424
1425    #[test]
1426    fn chunked_too_large() {
1427        let body = b"10\r\n0123456789ABCDEF\r\n0\r\n\r\n"; // 16 bytes
1428        let config = BodyConfig::new().with_max_size(10);
1429        let mut reader = ChunkedReader::new(body, &config);
1430
1431        let result = reader.decode_all();
1432        assert!(matches!(
1433            result,
1434            Err(BodyError::TooLarge { size: 16, max: 10 })
1435        ));
1436    }
1437
1438    #[test]
1439    fn chunked_invalid_size() {
1440        let body = b"xyz\r\nHello\r\n0\r\n\r\n";
1441        let config = BodyConfig::default();
1442        let mut reader = ChunkedReader::new(body, &config);
1443
1444        let result = reader.decode_all();
1445        assert!(matches!(
1446            result,
1447            Err(BodyError::InvalidChunkedEncoding { detail: _ })
1448        ));
1449    }
1450
1451    #[test]
1452    fn chunked_missing_crlf() {
1453        let body = b"5\r\nHelloX0\r\n\r\n"; // Missing CRLF after data
1454        let config = BodyConfig::default();
1455        let mut reader = ChunkedReader::new(body, &config);
1456
1457        let result = reader.decode_all();
1458        assert!(matches!(
1459            result,
1460            Err(BodyError::InvalidChunkedEncoding {
1461                detail: "expected CRLF after chunk data"
1462            })
1463        ));
1464    }
1465
1466    #[test]
1467    fn chunked_incomplete() {
1468        let body = b"5\r\nHel"; // Incomplete chunk data
1469        let config = BodyConfig::default();
1470        let mut reader = ChunkedReader::new(body, &config);
1471
1472        let result = reader.decode_all();
1473        assert!(matches!(result, Err(BodyError::Incomplete { .. })));
1474    }
1475
1476    // ========================================================================
1477    // parse_body Tests
1478    // ========================================================================
1479
1480    #[test]
1481    fn parse_body_none() {
1482        let config = BodyConfig::default();
1483        let result = parse_body(b"ignored", BodyLength::None, &config).unwrap();
1484        assert!(result.is_none());
1485    }
1486
1487    #[test]
1488    fn parse_body_content_length() {
1489        let config = BodyConfig::default();
1490        let result = parse_body(b"Hello, World!", BodyLength::ContentLength(13), &config).unwrap();
1491        assert_eq!(result.unwrap(), b"Hello, World!");
1492    }
1493
1494    #[test]
1495    fn parse_body_content_length_zero() {
1496        let config = BodyConfig::default();
1497        let result = parse_body(b"", BodyLength::ContentLength(0), &config).unwrap();
1498        assert_eq!(result.unwrap(), b"");
1499    }
1500
1501    #[test]
1502    fn parse_body_chunked() {
1503        let config = BodyConfig::default();
1504        let result = parse_body(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config).unwrap();
1505        assert_eq!(result.unwrap(), b"Hello");
1506    }
1507
1508    #[test]
1509    fn parse_body_with_consumed_content_length() {
1510        let config = BodyConfig::default();
1511        let (body, consumed) =
1512            parse_body_with_consumed(b"Hello, World!", BodyLength::ContentLength(13), &config)
1513                .unwrap();
1514        assert_eq!(body.unwrap(), b"Hello, World!");
1515        assert_eq!(consumed, 13);
1516    }
1517
1518    #[test]
1519    fn parse_body_with_consumed_chunked() {
1520        let config = BodyConfig::default();
1521        let (body, consumed) =
1522            parse_body_with_consumed(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config)
1523                .unwrap();
1524        assert_eq!(body.unwrap(), b"Hello");
1525        assert_eq!(consumed, 15);
1526    }
1527
1528    // ========================================================================
1529    // validate_content_length Tests
1530    // ========================================================================
1531
1532    #[test]
1533    fn validate_content_length_ok() {
1534        let config = BodyConfig::new().with_max_size(1000);
1535        assert!(validate_content_length(500, &config).is_ok());
1536        assert!(validate_content_length(1000, &config).is_ok());
1537    }
1538
1539    #[test]
1540    fn validate_content_length_too_large() {
1541        let config = BodyConfig::new().with_max_size(1000);
1542        let result = validate_content_length(1001, &config);
1543        assert!(matches!(
1544            result,
1545            Err(BodyError::TooLarge {
1546                size: 1001,
1547                max: 1000
1548            })
1549        ));
1550    }
1551
1552    // ========================================================================
1553    // BodyError Tests
1554    // ========================================================================
1555
1556    #[test]
1557    fn body_error_display() {
1558        let err = BodyError::TooLarge {
1559            size: 2000,
1560            max: 1000,
1561        };
1562        assert_eq!(
1563            format!("{err}"),
1564            "body too large: 2000 bytes exceeds limit of 1000"
1565        );
1566
1567        let err = BodyError::InvalidChunkedEncoding {
1568            detail: "bad format",
1569        };
1570        assert_eq!(format!("{err}"), "invalid chunked encoding: bad format");
1571
1572        let err = BodyError::Incomplete {
1573            received: 50,
1574            expected: Some(100),
1575        };
1576        assert_eq!(
1577            format!("{err}"),
1578            "incomplete body: received 50 of 100 bytes"
1579        );
1580
1581        let err = BodyError::UnexpectedEof;
1582        assert_eq!(format!("{err}"), "unexpected end of body");
1583    }
1584
1585    // ========================================================================
1586    // StreamingBodyConfig Tests
1587    // ========================================================================
1588
1589    #[test]
1590    fn streaming_body_config_defaults() {
1591        let config = StreamingBodyConfig::default();
1592        assert_eq!(config.streaming_threshold, DEFAULT_STREAMING_THRESHOLD);
1593        assert_eq!(config.chunk_size, 8 * 1024);
1594        assert_eq!(config.max_size, DEFAULT_MAX_BODY_SIZE);
1595    }
1596
1597    #[test]
1598    fn streaming_body_config_custom() {
1599        let config = StreamingBodyConfig::new()
1600            .with_streaming_threshold(1024)
1601            .with_chunk_size(4096)
1602            .with_max_size(10_000);
1603        assert_eq!(config.streaming_threshold, 1024);
1604        assert_eq!(config.chunk_size, 4096);
1605        assert_eq!(config.max_size, 10_000);
1606    }
1607
1608    #[test]
1609    fn streaming_body_config_minimum_chunk_size() {
1610        let config = StreamingBodyConfig::new().with_chunk_size(0);
1611        // Should be clamped to minimum of 1 byte
1612        assert_eq!(config.chunk_size, 1);
1613    }
1614
1615    #[test]
1616    fn streaming_body_config_should_stream() {
1617        let config = StreamingBodyConfig::new().with_streaming_threshold(1000);
1618        assert!(!config.should_stream(500));
1619        assert!(!config.should_stream(1000));
1620        assert!(config.should_stream(1001));
1621        assert!(config.should_stream(10000));
1622    }
1623
1624    // ========================================================================
1625    // AsyncContentLengthStream Tests
1626    // ========================================================================
1627
1628    #[test]
1629    fn async_content_length_stream_from_buffer() {
1630        use std::sync::Arc;
1631        use std::task::{Wake, Waker};
1632
1633        struct NoopWaker;
1634        impl Wake for NoopWaker {
1635            fn wake(self: Arc<Self>) {}
1636        }
1637
1638        fn noop_waker() -> Waker {
1639            Waker::from(Arc::new(NoopWaker))
1640        }
1641
1642        // Create a mock reader that won't be used (buffer is complete)
1643        struct EmptyReader;
1644        impl AsyncRead for EmptyReader {
1645            fn poll_read(
1646                self: Pin<&mut Self>,
1647                _cx: &mut Context<'_>,
1648                _buf: &mut asupersync::io::ReadBuf<'_>,
1649            ) -> Poll<std::io::Result<()>> {
1650                Poll::Ready(Ok(()))
1651            }
1652        }
1653
1654        let buffer = b"Hello, World!".to_vec();
1655        let config = StreamingBodyConfig::new().with_chunk_size(5);
1656        let mut stream = AsyncContentLengthStream::new(buffer, EmptyReader, 13, &config);
1657
1658        assert_eq!(stream.expected_size(), 13);
1659        assert_eq!(stream.bytes_read(), 0);
1660        assert_eq!(stream.remaining(), 13);
1661
1662        let waker = noop_waker();
1663        let mut cx = Context::from_waker(&waker);
1664
1665        // First chunk: "Hello"
1666        let result = Pin::new(&mut stream).poll_next(&mut cx);
1667        match result {
1668            Poll::Ready(Some(Ok(chunk))) => {
1669                assert_eq!(chunk, b"Hello");
1670            }
1671            _ => panic!("expected chunk"),
1672        }
1673        assert_eq!(stream.bytes_read(), 5);
1674
1675        // Second chunk: ", Wor"
1676        let result = Pin::new(&mut stream).poll_next(&mut cx);
1677        match result {
1678            Poll::Ready(Some(Ok(chunk))) => {
1679                assert_eq!(chunk, b", Wor");
1680            }
1681            _ => panic!("expected chunk"),
1682        }
1683
1684        // Third chunk: "ld!"
1685        let result = Pin::new(&mut stream).poll_next(&mut cx);
1686        match result {
1687            Poll::Ready(Some(Ok(chunk))) => {
1688                assert_eq!(chunk, b"ld!");
1689            }
1690            _ => panic!("expected chunk"),
1691        }
1692
1693        // End of stream
1694        let result = Pin::new(&mut stream).poll_next(&mut cx);
1695        assert!(matches!(result, Poll::Ready(None)));
1696        assert!(stream.is_complete());
1697    }
1698
1699    // ========================================================================
1700    // AsyncChunkedStream Tests
1701    // ========================================================================
1702
1703    #[test]
1704    fn async_chunked_stream_simple() {
1705        use std::sync::Arc;
1706        use std::task::{Wake, Waker};
1707
1708        struct NoopWaker;
1709        impl Wake for NoopWaker {
1710            fn wake(self: Arc<Self>) {}
1711        }
1712
1713        fn noop_waker() -> Waker {
1714            Waker::from(Arc::new(NoopWaker))
1715        }
1716
1717        struct EmptyReader;
1718        impl AsyncRead for EmptyReader {
1719            fn poll_read(
1720                self: Pin<&mut Self>,
1721                _cx: &mut Context<'_>,
1722                _buf: &mut asupersync::io::ReadBuf<'_>,
1723            ) -> Poll<std::io::Result<()>> {
1724                Poll::Ready(Ok(()))
1725            }
1726        }
1727
1728        // Complete chunked body in buffer: "Hello" in chunked encoding
1729        let buffer = b"5\r\nHello\r\n0\r\n\r\n".to_vec();
1730        let config = StreamingBodyConfig::new().with_chunk_size(1024);
1731        let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1732
1733        let waker = noop_waker();
1734        let mut cx = Context::from_waker(&waker);
1735
1736        // First chunk: "Hello"
1737        let result = Pin::new(&mut stream).poll_next(&mut cx);
1738        match result {
1739            Poll::Ready(Some(Ok(chunk))) => {
1740                assert_eq!(chunk, b"Hello");
1741            }
1742            _ => panic!("expected chunk, got {:?}", result),
1743        }
1744
1745        // Note: Need to poll again to process CRLF and next chunk size
1746        // The implementation returns Pending after transition, then processes on next poll
1747        let result = Pin::new(&mut stream).poll_next(&mut cx);
1748        // Should be complete (0\r\n\r\n)
1749        assert!(matches!(result, Poll::Ready(None)));
1750        assert!(stream.is_complete());
1751    }
1752
1753    #[test]
1754    fn async_chunked_stream_multiple_chunks() {
1755        use std::sync::Arc;
1756        use std::task::{Wake, Waker};
1757
1758        struct NoopWaker;
1759        impl Wake for NoopWaker {
1760            fn wake(self: Arc<Self>) {}
1761        }
1762
1763        fn noop_waker() -> Waker {
1764            Waker::from(Arc::new(NoopWaker))
1765        }
1766
1767        struct EmptyReader;
1768        impl AsyncRead for EmptyReader {
1769            fn poll_read(
1770                self: Pin<&mut Self>,
1771                _cx: &mut Context<'_>,
1772                _buf: &mut asupersync::io::ReadBuf<'_>,
1773            ) -> Poll<std::io::Result<()>> {
1774                Poll::Ready(Ok(()))
1775            }
1776        }
1777
1778        // "Hello, World!" in chunked encoding
1779        let buffer = b"5\r\nHello\r\n8\r\n, World!\r\n0\r\n\r\n".to_vec();
1780        let config = StreamingBodyConfig::new();
1781        let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1782
1783        let waker = noop_waker();
1784        let mut cx = Context::from_waker(&waker);
1785
1786        // Collect all chunks
1787        let mut collected = Vec::new();
1788        loop {
1789            match Pin::new(&mut stream).poll_next(&mut cx) {
1790                Poll::Ready(Some(Ok(chunk))) => collected.extend_from_slice(&chunk),
1791                Poll::Ready(Some(Err(e))) => panic!("unexpected error: {e}"),
1792                Poll::Ready(None) => break,
1793                Poll::Pending => {} // Continue processing
1794            }
1795        }
1796
1797        assert_eq!(collected, b"Hello, World!");
1798    }
1799}