Skip to main content

fastapi_http/
body.rs

1//! HTTP request body handling.
2//!
3//! This module provides body reading with support for:
4//! - Content-Length based reading with size limits
5//! - Chunked transfer encoding parsing
6//! - Streaming API for large bodies
7//! - Integration points for async I/O
8//!
9//! # Body Size Limits
10//!
11//! By default, bodies are limited to 1MB. This prevents denial-of-service
12//! attacks via large payloads. The limit is configurable per-request.
13//!
14//! # Streaming
15//!
16//! Large bodies can be read incrementally via the body reader API,
17//! which supports async I/O integration with checkpoints.
18//!
19//! # Example
20//!
21//! ```ignore
22//! use fastapi_http::body::{BodyConfig, ContentLengthReader};
23//!
24//! let config = BodyConfig::default().with_max_size(1024 * 1024);
25//! let mut reader = ContentLengthReader::new(body_bytes, 100, &config)?;
26//!
27//! let body = reader.read_all()?;
28//! ```
29
30use crate::parser::{BodyLength, ParseError};
31
32/// Default maximum body size (1MB).
33pub const DEFAULT_MAX_BODY_SIZE: usize = 1024 * 1024;
34
35/// Configuration for body reading.
36#[derive(Debug, Clone)]
37pub struct BodyConfig {
38    /// Maximum body size in bytes.
39    max_size: usize,
40    /// Initial buffer capacity for streaming.
41    initial_capacity: usize,
42}
43
44impl Default for BodyConfig {
45    fn default() -> Self {
46        Self {
47            max_size: DEFAULT_MAX_BODY_SIZE,
48            initial_capacity: 4096,
49        }
50    }
51}
52
53impl BodyConfig {
54    /// Create a new body configuration with default settings.
55    #[must_use]
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    /// Set the maximum body size.
61    #[must_use]
62    pub fn with_max_size(mut self, size: usize) -> Self {
63        self.max_size = size;
64        self
65    }
66
67    /// Set the initial buffer capacity.
68    #[must_use]
69    pub fn with_initial_capacity(mut self, capacity: usize) -> Self {
70        self.initial_capacity = capacity;
71        self
72    }
73
74    /// Returns the maximum body size.
75    #[must_use]
76    pub fn max_size(&self) -> usize {
77        self.max_size
78    }
79
80    /// Returns the initial buffer capacity.
81    #[must_use]
82    pub fn initial_capacity(&self) -> usize {
83        self.initial_capacity
84    }
85}
86
87/// Error types for body reading.
88#[derive(Debug)]
89pub enum BodyError {
90    /// Body exceeds maximum allowed size.
91    TooLarge {
92        /// The declared or actual size.
93        size: usize,
94        /// The maximum allowed size.
95        max: usize,
96    },
97    /// Invalid chunked encoding.
98    InvalidChunkedEncoding {
99        /// Description of the error.
100        detail: &'static str,
101    },
102    /// Incomplete body (need more data).
103    Incomplete {
104        /// Bytes received so far.
105        received: usize,
106        /// Expected total size (if known).
107        expected: Option<usize>,
108    },
109    /// Unexpected end of input.
110    UnexpectedEof,
111    /// Parse error from underlying parser.
112    Parse(ParseError),
113}
114
115impl std::fmt::Display for BodyError {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            Self::TooLarge { size, max } => {
119                write!(f, "body too large: {size} bytes exceeds limit of {max}")
120            }
121            Self::InvalidChunkedEncoding { detail } => {
122                write!(f, "invalid chunked encoding: {detail}")
123            }
124            Self::Incomplete { received, expected } => {
125                if let Some(exp) = expected {
126                    write!(f, "incomplete body: received {received} of {exp} bytes")
127                } else {
128                    write!(f, "incomplete body: received {received} bytes")
129                }
130            }
131            Self::UnexpectedEof => write!(f, "unexpected end of body"),
132            Self::Parse(e) => write!(f, "parse error: {e}"),
133        }
134    }
135}
136
137impl std::error::Error for BodyError {
138    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
139        match self {
140            Self::Parse(e) => Some(e),
141            _ => None,
142        }
143    }
144}
145
146impl From<ParseError> for BodyError {
147    fn from(e: ParseError) -> Self {
148        Self::Parse(e)
149    }
150}
151
152// ============================================================================
153// Content-Length Body Reading
154// ============================================================================
155
156/// Reads a body with a known Content-Length.
157///
158/// This reader validates that exactly `length` bytes are provided and
159/// enforces the configured size limit.
160#[derive(Debug)]
161pub struct ContentLengthReader<'a> {
162    buffer: &'a [u8],
163    length: usize,
164    position: usize,
165    // Stored for potential future use (streaming chunk size configuration)
166    #[allow(dead_code)]
167    config: BodyConfig,
168}
169
170impl<'a> ContentLengthReader<'a> {
171    /// Create a new Content-Length reader.
172    ///
173    /// # Arguments
174    ///
175    /// * `buffer` - The buffer containing body bytes
176    /// * `length` - The Content-Length value
177    /// * `config` - Body reading configuration
178    ///
179    /// # Errors
180    ///
181    /// Returns `BodyError::TooLarge` if `length` exceeds the configured maximum.
182    pub fn new(buffer: &'a [u8], length: usize, config: &BodyConfig) -> Result<Self, BodyError> {
183        // Check size limit before reading
184        if length > config.max_size {
185            return Err(BodyError::TooLarge {
186                size: length,
187                max: config.max_size,
188            });
189        }
190
191        Ok(Self {
192            buffer,
193            length,
194            position: 0,
195            config: config.clone(),
196        })
197    }
198
199    /// Returns the expected body length.
200    #[must_use]
201    pub fn length(&self) -> usize {
202        self.length
203    }
204
205    /// Returns the number of bytes remaining.
206    #[must_use]
207    pub fn remaining(&self) -> usize {
208        self.length.saturating_sub(self.position)
209    }
210
211    /// Returns true if all bytes have been read.
212    #[must_use]
213    pub fn is_complete(&self) -> bool {
214        self.position >= self.length
215    }
216
217    /// Read up to `max_bytes` into the provided buffer.
218    ///
219    /// Returns the number of bytes read.
220    pub fn read(&mut self, dest: &mut [u8]) -> Result<usize, BodyError> {
221        if self.is_complete() {
222            return Ok(0);
223        }
224
225        let available = self.buffer.len().saturating_sub(self.position);
226        let to_read = dest.len().min(self.remaining()).min(available);
227
228        if to_read == 0 && !self.is_complete() {
229            return Err(BodyError::Incomplete {
230                received: self.position,
231                expected: Some(self.length),
232            });
233        }
234
235        dest[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
236        self.position += to_read;
237
238        Ok(to_read)
239    }
240
241    /// Read all remaining body bytes.
242    ///
243    /// # Errors
244    ///
245    /// Returns `BodyError::Incomplete` if the buffer doesn't contain enough data.
246    pub fn read_all(&mut self) -> Result<Vec<u8>, BodyError> {
247        if self.buffer.len() < self.length {
248            return Err(BodyError::Incomplete {
249                received: self.buffer.len(),
250                expected: Some(self.length),
251            });
252        }
253
254        let body = self.buffer[..self.length].to_vec();
255        self.position = self.length;
256        Ok(body)
257    }
258
259    /// Read all remaining body bytes as a borrowed slice.
260    ///
261    /// This is zero-copy when the entire body is already in the buffer.
262    ///
263    /// # Errors
264    ///
265    /// Returns `BodyError::Incomplete` if the buffer doesn't contain enough data.
266    pub fn read_all_borrowed(&self) -> Result<&'a [u8], BodyError> {
267        if self.buffer.len() < self.length {
268            return Err(BodyError::Incomplete {
269                received: self.buffer.len(),
270                expected: Some(self.length),
271            });
272        }
273
274        Ok(&self.buffer[..self.length])
275    }
276}
277
278// ============================================================================
279// Chunked Transfer Encoding
280// ============================================================================
281
282/// State machine for chunked encoding parsing.
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284enum ChunkedState {
285    /// Expecting chunk size line.
286    ChunkSize,
287    /// Reading chunk data.
288    ChunkData { remaining: usize },
289    /// Expecting CRLF after chunk data.
290    ChunkDataEnd,
291    /// Reading trailers (after final chunk).
292    Trailers,
293    /// Complete.
294    Complete,
295}
296
297/// Parses chunked transfer encoding.
298///
299/// Chunked encoding format:
300/// ```text
301/// chunk-size CRLF
302/// chunk-data CRLF
303/// ...
304/// 0 CRLF
305/// [trailers] CRLF
306/// ```
307#[derive(Debug)]
308pub struct ChunkedReader<'a> {
309    buffer: &'a [u8],
310    position: usize,
311    state: ChunkedState,
312    total_size: usize,
313    config: BodyConfig,
314}
315
316impl<'a> ChunkedReader<'a> {
317    /// Create a new chunked reader.
318    ///
319    /// # Arguments
320    ///
321    /// * `buffer` - The buffer containing chunked body data
322    /// * `config` - Body reading configuration
323    #[must_use]
324    pub fn new(buffer: &'a [u8], config: &BodyConfig) -> Self {
325        Self {
326            buffer,
327            position: 0,
328            state: ChunkedState::ChunkSize,
329            total_size: 0,
330            config: config.clone(),
331        }
332    }
333
334    /// Returns true if parsing is complete.
335    #[must_use]
336    pub fn is_complete(&self) -> bool {
337        self.state == ChunkedState::Complete
338    }
339
340    /// Returns the total decoded body size so far.
341    #[must_use]
342    pub fn total_size(&self) -> usize {
343        self.total_size
344    }
345
346    /// Parse the chunk size line and return the size.
347    fn parse_chunk_size(&self) -> Result<(usize, usize), BodyError> {
348        let remaining = &self.buffer[self.position..];
349
350        // Find CRLF
351        let line_end =
352            remaining
353                .windows(2)
354                .position(|w| w == b"\r\n")
355                .ok_or(BodyError::Incomplete {
356                    received: self.position,
357                    expected: None,
358                })?;
359
360        let size_line = &remaining[..line_end];
361
362        // Parse hex size (ignore chunk extensions after semicolon)
363        let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';') {
364            &size_line[..semi]
365        } else {
366            size_line
367        };
368
369        let size_str =
370            std::str::from_utf8(size_str).map_err(|_| BodyError::InvalidChunkedEncoding {
371                detail: "invalid UTF-8 in chunk size",
372            })?;
373
374        let size = usize::from_str_radix(size_str.trim(), 16).map_err(|_| {
375            BodyError::InvalidChunkedEncoding {
376                detail: "invalid hex chunk size",
377            }
378        })?;
379
380        // Reject unreasonably large chunk sizes early to prevent DoS.
381        // Individual chunks over 16MB are almost certainly attacks.
382        const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
383        if size > MAX_SINGLE_CHUNK {
384            return Err(BodyError::InvalidChunkedEncoding {
385                detail: "chunk size exceeds 16MB limit",
386            });
387        }
388
389        // bytes_consumed = size_line + CRLF
390        Ok((size, line_end + 2))
391    }
392
393    /// Decode all chunks into a single buffer.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if:
398    /// - The total size exceeds the configured limit
399    /// - The chunked encoding is malformed
400    /// - The buffer is incomplete
401    pub fn decode_all(&mut self) -> Result<Vec<u8>, BodyError> {
402        let mut output = Vec::with_capacity(self.config.initial_capacity);
403
404        loop {
405            match self.state {
406                ChunkedState::ChunkSize => {
407                    let (size, consumed) = self.parse_chunk_size()?;
408                    self.position += consumed;
409
410                    // Check size limit
411                    let new_total = self.total_size.saturating_add(size);
412                    if new_total > self.config.max_size {
413                        return Err(BodyError::TooLarge {
414                            size: new_total,
415                            max: self.config.max_size,
416                        });
417                    }
418
419                    if size == 0 {
420                        // Final chunk - transition to trailers
421                        self.state = ChunkedState::Trailers;
422                    } else {
423                        self.state = ChunkedState::ChunkData { remaining: size };
424                    }
425                }
426                ChunkedState::ChunkData { remaining } => {
427                    let available = self.buffer.len().saturating_sub(self.position);
428                    if available < remaining {
429                        return Err(BodyError::Incomplete {
430                            received: self.total_size + (remaining - available),
431                            expected: None,
432                        });
433                    }
434
435                    // Copy chunk data
436                    let chunk_data = &self.buffer[self.position..self.position + remaining];
437                    output.extend_from_slice(chunk_data);
438                    self.position += remaining;
439                    self.total_size += remaining;
440
441                    self.state = ChunkedState::ChunkDataEnd;
442                }
443                ChunkedState::ChunkDataEnd => {
444                    // Expect CRLF
445                    let remaining = &self.buffer[self.position..];
446                    if remaining.len() < 2 {
447                        return Err(BodyError::Incomplete {
448                            received: self.total_size,
449                            expected: None,
450                        });
451                    }
452
453                    if &remaining[..2] != b"\r\n" {
454                        return Err(BodyError::InvalidChunkedEncoding {
455                            detail: "expected CRLF after chunk data",
456                        });
457                    }
458
459                    self.position += 2;
460                    self.state = ChunkedState::ChunkSize;
461                }
462                ChunkedState::Trailers => {
463                    // Skip trailers until empty line
464                    let remaining = &self.buffer[self.position..];
465
466                    // Look for CRLF (empty line) or trailer headers
467                    if remaining.starts_with(b"\r\n") {
468                        self.position += 2;
469                        self.state = ChunkedState::Complete;
470                    } else {
471                        // Find end of trailer line
472                        let line_end = remaining.windows(2).position(|w| w == b"\r\n");
473                        match line_end {
474                            Some(pos) => {
475                                // Skip this trailer
476                                self.position += pos + 2;
477                                // Stay in Trailers state to handle more trailers or final CRLF
478                            }
479                            None => {
480                                return Err(BodyError::Incomplete {
481                                    received: self.total_size,
482                                    expected: None,
483                                });
484                            }
485                        }
486                    }
487                }
488                ChunkedState::Complete => {
489                    break;
490                }
491            }
492        }
493
494        Ok(output)
495    }
496
497    /// Returns the number of bytes consumed from the raw buffer.
498    #[must_use]
499    pub fn bytes_consumed(&self) -> usize {
500        self.position
501    }
502}
503
504// ============================================================================
505// Body Parsing from Headers
506// ============================================================================
507
508/// Parse a request body from a buffer given the body length indicator.
509///
510/// This is the main entry point for body parsing. It dispatches to the
511/// appropriate reader based on Content-Length or Transfer-Encoding.
512///
513/// # Arguments
514///
515/// * `buffer` - The buffer containing the body (after headers)
516/// * `body_length` - The body length indicator from header parsing
517/// * `config` - Body reading configuration
518///
519/// # Returns
520///
521/// Returns the parsed body bytes, or `None` if no body is expected.
522///
523/// # Errors
524///
525/// Returns an error if:
526/// - The body exceeds the configured size limit
527/// - The chunked encoding is malformed
528/// - The buffer is incomplete
529pub fn parse_body(
530    buffer: &[u8],
531    body_length: BodyLength,
532    config: &BodyConfig,
533) -> Result<Option<Vec<u8>>, BodyError> {
534    let (body, _) = parse_body_with_consumed(buffer, body_length, config)?;
535    Ok(body)
536}
537
538/// Parse a request body and return both the decoded body and bytes consumed.
539///
540/// This is useful for incremental parsing to determine request boundaries.
541///
542/// # Errors
543///
544/// Returns an error if:
545/// - The body exceeds the configured size limit
546/// - The chunked encoding is malformed
547/// - The buffer is incomplete
548pub fn parse_body_with_consumed(
549    buffer: &[u8],
550    body_length: BodyLength,
551    config: &BodyConfig,
552) -> Result<(Option<Vec<u8>>, usize), BodyError> {
553    match body_length {
554        BodyLength::None => Ok((None, 0)),
555        BodyLength::ContentLength(len) => {
556            if len == 0 {
557                return Ok((Some(Vec::new()), 0));
558            }
559            let mut reader = ContentLengthReader::new(buffer, len, config)?;
560            let body = reader.read_all()?;
561            Ok((Some(body), len))
562        }
563        BodyLength::Chunked => {
564            let mut reader = ChunkedReader::new(buffer, config);
565            let body = reader.decode_all()?;
566            Ok((Some(body), reader.bytes_consumed()))
567        }
568        BodyLength::Conflicting => Err(BodyError::InvalidChunkedEncoding {
569            detail: "conflicting body length indicators",
570        }),
571    }
572}
573
574/// Validates body size against Content-Length header before reading.
575///
576/// Call this early (before buffering) to reject oversized requests.
577///
578/// # Arguments
579///
580/// * `content_length` - The Content-Length header value
581/// * `config` - Body reading configuration
582///
583/// # Errors
584///
585/// Returns `BodyError::TooLarge` if the content length exceeds the limit.
586pub fn validate_content_length(
587    content_length: usize,
588    config: &BodyConfig,
589) -> Result<(), BodyError> {
590    if content_length > config.max_size {
591        return Err(BodyError::TooLarge {
592            size: content_length,
593            max: config.max_size,
594        });
595    }
596    Ok(())
597}
598
599// ============================================================================
600// Async Streaming Body Readers
601// ============================================================================
602
603use asupersync::io::AsyncRead;
604use asupersync::stream::Stream;
605use fastapi_core::RequestBodyStreamError;
606use std::pin::Pin;
607use std::task::{Context, Poll};
608
609/// Default threshold for enabling streaming (64KB).
610///
611/// Bodies larger than this will be streamed rather than buffered entirely.
612pub const DEFAULT_STREAMING_THRESHOLD: usize = 64 * 1024;
613
614/// Configuration for async body streaming.
615#[derive(Debug, Clone)]
616pub struct StreamingBodyConfig {
617    /// Threshold above which bodies are streamed.
618    pub streaming_threshold: usize,
619    /// Size of each read chunk.
620    pub chunk_size: usize,
621    /// Maximum body size (enforced during streaming).
622    pub max_size: usize,
623}
624
625impl Default for StreamingBodyConfig {
626    fn default() -> Self {
627        Self {
628            streaming_threshold: DEFAULT_STREAMING_THRESHOLD,
629            chunk_size: 8 * 1024, // 8KB chunks
630            max_size: DEFAULT_MAX_BODY_SIZE,
631        }
632    }
633}
634
635impl StreamingBodyConfig {
636    /// Create a new streaming config with default settings.
637    #[must_use]
638    pub fn new() -> Self {
639        Self::default()
640    }
641
642    /// Set the streaming threshold.
643    #[must_use]
644    pub fn with_streaming_threshold(mut self, threshold: usize) -> Self {
645        self.streaming_threshold = threshold;
646        self
647    }
648
649    /// Set the chunk size for reads.
650    ///
651    /// Note: For network efficiency, values below 1KB are allowed but not recommended
652    /// for production use.
653    #[must_use]
654    pub fn with_chunk_size(mut self, size: usize) -> Self {
655        self.chunk_size = size.max(1); // Minimum 1 byte (for testing)
656        self
657    }
658
659    /// Set the maximum body size.
660    #[must_use]
661    pub fn with_max_size(mut self, size: usize) -> Self {
662        self.max_size = size;
663        self
664    }
665
666    /// Returns true if the given content length should be streamed.
667    #[must_use]
668    pub fn should_stream(&self, content_length: usize) -> bool {
669        content_length > self.streaming_threshold
670    }
671}
672
673/// An async stream that reads a Content-Length body in chunks.
674///
675/// This stream first yields any buffered data from the parser, then
676/// continues reading from the underlying async reader until the
677/// expected length is reached.
678///
679/// # Memory Efficiency
680///
681/// Only one chunk is buffered at a time, making this suitable for
682/// streaming large request bodies without excessive memory usage.
683pub struct AsyncContentLengthStream<R> {
684    /// Optional reader for more data (None after initial buffer exhausted and no reader).
685    reader: Option<R>,
686    /// Initial buffer from parser.
687    initial_buffer: Vec<u8>,
688    /// Position in initial buffer.
689    initial_position: usize,
690    /// Expected total size from Content-Length.
691    expected_size: usize,
692    /// Bytes read so far.
693    bytes_read: usize,
694    /// Chunk size for reads.
695    chunk_size: usize,
696    /// Maximum allowed size.
697    max_size: usize,
698    /// Read buffer (reused across reads).
699    read_buffer: Vec<u8>,
700    /// Whether the stream is complete.
701    complete: bool,
702    /// Whether an error occurred.
703    error: bool,
704}
705
706impl<R> AsyncContentLengthStream<R>
707where
708    R: AsyncRead + Unpin + Send + Sync + 'static,
709{
710    /// Create a new Content-Length stream.
711    ///
712    /// # Arguments
713    ///
714    /// * `initial_buffer` - Any bytes already buffered by the parser
715    /// * `reader` - The async reader for remaining bytes
716    /// * `content_length` - Expected total body size
717    /// * `config` - Streaming configuration
718    pub fn new(
719        initial_buffer: Vec<u8>,
720        reader: R,
721        content_length: usize,
722        config: &StreamingBodyConfig,
723    ) -> Self {
724        Self {
725            reader: Some(reader),
726            initial_buffer,
727            initial_position: 0,
728            expected_size: content_length,
729            bytes_read: 0,
730            chunk_size: config.chunk_size,
731            max_size: config.max_size,
732            read_buffer: vec![0u8; config.chunk_size],
733            complete: false,
734            error: false,
735        }
736    }
737
738    /// Create a Content-Length stream with default config.
739    pub fn with_defaults(initial_buffer: Vec<u8>, reader: R, content_length: usize) -> Self {
740        Self::new(
741            initial_buffer,
742            reader,
743            content_length,
744            &StreamingBodyConfig::default(),
745        )
746    }
747
748    /// Returns the expected total size.
749    #[must_use]
750    pub fn expected_size(&self) -> usize {
751        self.expected_size
752    }
753
754    /// Returns the number of bytes read so far.
755    #[must_use]
756    pub fn bytes_read(&self) -> usize {
757        self.bytes_read
758    }
759
760    /// Returns the remaining bytes to read.
761    #[must_use]
762    pub fn remaining(&self) -> usize {
763        self.expected_size.saturating_sub(self.bytes_read)
764    }
765
766    /// Returns true if the stream is complete.
767    #[must_use]
768    pub fn is_complete(&self) -> bool {
769        self.complete
770    }
771
772    fn initial_remaining(&self) -> usize {
773        self.initial_buffer
774            .len()
775            .saturating_sub(self.initial_position)
776    }
777}
778
779impl<R> Stream for AsyncContentLengthStream<R>
780where
781    R: AsyncRead + Unpin + Send + Sync + 'static,
782{
783    type Item = Result<Vec<u8>, RequestBodyStreamError>;
784
785    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786        // Check if complete or error
787        if self.complete || self.error {
788            return Poll::Ready(None);
789        }
790
791        // Check size limit
792        if self.bytes_read > self.max_size {
793            self.error = true;
794            let bytes_read = self.bytes_read;
795            let max_size = self.max_size;
796            return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
797                received: bytes_read,
798                max: max_size,
799            })));
800        }
801
802        // Check if we've read all expected bytes
803        if self.bytes_read >= self.expected_size {
804            self.complete = true;
805            return Poll::Ready(None);
806        }
807
808        let remaining_for_body = self.expected_size.saturating_sub(self.bytes_read);
809        let remaining_budget = self.max_size.saturating_sub(self.bytes_read);
810        if remaining_for_body > 0 && remaining_budget == 0 {
811            self.error = true;
812            return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
813                received: self.bytes_read.saturating_add(1),
814                max: self.max_size,
815            })));
816        }
817
818        // First, try to yield from initial buffer
819        let initial_remaining = self.initial_remaining();
820        if initial_remaining > 0 {
821            let chunk_size = self
822                .chunk_size
823                .min(initial_remaining)
824                .min(remaining_for_body)
825                .min(remaining_budget);
826
827            if chunk_size > 0 {
828                let start = self.initial_position;
829                let chunk = self.initial_buffer[start..start + chunk_size].to_vec();
830                self.initial_position += chunk_size;
831                self.bytes_read += chunk_size;
832                return Poll::Ready(Some(Ok(chunk)));
833            }
834        }
835
836        // Initial buffer exhausted, read from reader
837        let remaining = self.expected_size.saturating_sub(self.bytes_read);
838        let to_read = self.chunk_size.min(remaining).min(remaining_budget);
839
840        if to_read == 0 {
841            self.complete = true;
842            return Poll::Ready(None);
843        }
844
845        // Ensure buffer is sized appropriately
846        if self.read_buffer.len() < to_read {
847            self.read_buffer.resize(to_read, 0);
848        }
849
850        // Take reader temporarily to avoid borrow conflicts
851        let mut reader = match self.reader.take() {
852            Some(r) => r,
853            None => {
854                self.error = true;
855                return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
856            }
857        };
858
859        // Perform the read and extract result before modifying self
860        let read_result = {
861            let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..to_read]);
862            match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
863                Poll::Ready(Ok(())) => {
864                    let n = read_buf.filled().len();
865                    let chunk = read_buf.filled().to_vec();
866                    Poll::Ready(Ok((n, chunk)))
867                }
868                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
869                Poll::Pending => Poll::Pending,
870            }
871        };
872
873        match read_result {
874            Poll::Ready(Ok((n, chunk))) => {
875                if n == 0 {
876                    // EOF before expected bytes - incomplete body
877                    self.error = true;
878                    return Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed)));
879                }
880
881                self.bytes_read += n;
882
883                // Put reader back
884                self.reader = Some(reader);
885
886                Poll::Ready(Some(Ok(chunk)))
887            }
888            Poll::Ready(Err(e)) => {
889                self.error = true;
890                Poll::Ready(Some(Err(RequestBodyStreamError::Io(e.to_string()))))
891            }
892            Poll::Pending => {
893                // Put reader back before returning Pending
894                self.reader = Some(reader);
895                Poll::Pending
896            }
897        }
898    }
899}
900
901/// Parsing state for chunked encoding.
902#[derive(Debug, Clone, Copy, PartialEq, Eq)]
903enum AsyncChunkedState {
904    /// Parsing chunk size line.
905    ChunkSize,
906    /// Reading chunk data.
907    ChunkData { remaining: usize },
908    /// Expecting CRLF after chunk data.
909    ChunkDataEnd,
910    /// Reading trailers (after final chunk).
911    Trailers,
912    /// Complete.
913    Complete,
914    /// Error.
915    Error,
916}
917
918/// An async stream that reads a chunked-encoded body.
919///
920/// This stream parses chunked transfer encoding on the fly,
921/// yielding decoded chunks as they become available.
922///
923/// # Chunked Encoding Format
924///
925/// ```text
926/// chunk-size CRLF
927/// chunk-data CRLF
928/// ...
929/// 0 CRLF
930/// [trailers] CRLF
931/// ```
932pub struct AsyncChunkedStream<R> {
933    /// Reader for more data (used when buffer is exhausted).
934    #[allow(dead_code)]
935    reader: Option<R>,
936    /// Parsing state.
937    state: AsyncChunkedState,
938    /// Total decoded bytes so far.
939    bytes_decoded: usize,
940    /// Maximum allowed size.
941    max_size: usize,
942    /// Chunk size for reads.
943    chunk_size: usize,
944    /// Read buffer (used when socket reads are needed).
945    #[allow(dead_code)]
946    read_buffer: Vec<u8>,
947    /// Buffer for initial data from parser + any data read from socket.
948    buffer: Vec<u8>,
949    /// Position in buffer.
950    position: usize,
951}
952
953impl<R> AsyncChunkedStream<R>
954where
955    R: AsyncRead + Unpin + Send + Sync + 'static,
956{
957    /// Create a new chunked stream.
958    ///
959    /// # Arguments
960    ///
961    /// * `initial_buffer` - Any bytes already buffered by the parser
962    /// * `reader` - The async reader for remaining bytes
963    /// * `config` - Streaming configuration
964    ///
965    /// # Panics
966    ///
967    /// Panics if `initial_buffer` exceeds `config.max_size`. Use `try_new` for
968    /// fallible construction.
969    pub fn new(initial_buffer: Vec<u8>, reader: R, config: &StreamingBodyConfig) -> Self {
970        assert!(
971            initial_buffer.len() <= config.max_size,
972            "initial buffer size {} exceeds max size {}",
973            initial_buffer.len(),
974            config.max_size
975        );
976        Self {
977            reader: Some(reader),
978            state: AsyncChunkedState::ChunkSize,
979            bytes_decoded: 0,
980            max_size: config.max_size,
981            chunk_size: config.chunk_size,
982            read_buffer: vec![0u8; config.chunk_size],
983            buffer: initial_buffer,
984            position: 0,
985        }
986    }
987
988    /// Try to create a new chunked stream, returning error if initial buffer is too large.
989    ///
990    /// # Errors
991    ///
992    /// Returns error if `initial_buffer.len()` exceeds `config.max_size`.
993    pub fn try_new(
994        initial_buffer: Vec<u8>,
995        reader: R,
996        config: &StreamingBodyConfig,
997    ) -> Result<Self, RequestBodyStreamError> {
998        if initial_buffer.len() > config.max_size {
999            return Err(RequestBodyStreamError::Io(format!(
1000                "initial buffer size {} exceeds max size {}",
1001                initial_buffer.len(),
1002                config.max_size
1003            )));
1004        }
1005        Ok(Self {
1006            reader: Some(reader),
1007            state: AsyncChunkedState::ChunkSize,
1008            bytes_decoded: 0,
1009            max_size: config.max_size,
1010            chunk_size: config.chunk_size,
1011            read_buffer: vec![0u8; config.chunk_size],
1012            buffer: initial_buffer,
1013            position: 0,
1014        })
1015    }
1016
1017    /// Create a chunked stream with default config.
1018    pub fn with_defaults(initial_buffer: Vec<u8>, reader: R) -> Self {
1019        Self::new(initial_buffer, reader, &StreamingBodyConfig::default())
1020    }
1021
1022    /// Returns the total decoded bytes so far.
1023    #[must_use]
1024    pub fn bytes_decoded(&self) -> usize {
1025        self.bytes_decoded
1026    }
1027
1028    /// Returns true if the stream is complete.
1029    #[must_use]
1030    pub fn is_complete(&self) -> bool {
1031        self.state == AsyncChunkedState::Complete
1032    }
1033
1034    /// Get remaining buffer bytes.
1035    fn buffer_remaining(&self) -> &[u8] {
1036        &self.buffer[self.position..]
1037    }
1038
1039    /// Consume bytes from buffer.
1040    fn consume(&mut self, n: usize) {
1041        self.position += n;
1042    }
1043
1044    fn compact_buffer_if_needed(&mut self) {
1045        if self.position == 0 {
1046            return;
1047        }
1048        if self.position >= self.buffer.len() {
1049            self.buffer.clear();
1050            self.position = 0;
1051            return;
1052        }
1053
1054        // Avoid unbounded growth: once we've consumed enough, shift the unread tail down.
1055        let should_compact = self.position > 8 * 1024 || self.position > (self.buffer.len() / 2);
1056        if should_compact {
1057            self.buffer.drain(..self.position);
1058            self.position = 0;
1059        }
1060    }
1061
1062    fn poll_read_more_sized(
1063        &mut self,
1064        cx: &mut Context<'_>,
1065        max_read: usize,
1066    ) -> Poll<Result<usize, RequestBodyStreamError>> {
1067        self.compact_buffer_if_needed();
1068
1069        let max_read = max_read.min(self.read_buffer.len());
1070        if max_read == 0 {
1071            self.state = AsyncChunkedState::Error;
1072            return Poll::Ready(Err(RequestBodyStreamError::Io(
1073                "invalid read buffer size".to_string(),
1074            )));
1075        }
1076
1077        let mut reader = match self.reader.take() {
1078            Some(r) => r,
1079            None => {
1080                self.state = AsyncChunkedState::Error;
1081                return Poll::Ready(Err(RequestBodyStreamError::ConnectionClosed));
1082            }
1083        };
1084
1085        let read_result = {
1086            let mut read_buf = asupersync::io::ReadBuf::new(&mut self.read_buffer[..max_read]);
1087            match Pin::new(&mut reader).poll_read(cx, &mut read_buf) {
1088                Poll::Ready(Ok(())) => {
1089                    let filled = read_buf.filled();
1090                    Poll::Ready(Ok(filled.len()))
1091                }
1092                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1093                Poll::Pending => Poll::Pending,
1094            }
1095        };
1096
1097        match read_result {
1098            Poll::Ready(Ok(n)) => {
1099                if n == 0 {
1100                    self.state = AsyncChunkedState::Error;
1101                    self.reader = Some(reader);
1102                    return Poll::Ready(Err(RequestBodyStreamError::ConnectionClosed));
1103                }
1104                self.buffer.extend_from_slice(&self.read_buffer[..n]);
1105                self.reader = Some(reader);
1106                Poll::Ready(Ok(n))
1107            }
1108            Poll::Ready(Err(e)) => {
1109                self.state = AsyncChunkedState::Error;
1110                self.reader = Some(reader);
1111                Poll::Ready(Err(RequestBodyStreamError::Io(e.to_string())))
1112            }
1113            Poll::Pending => {
1114                self.reader = Some(reader);
1115                Poll::Pending
1116            }
1117        }
1118    }
1119}
1120
1121impl<R> Stream for AsyncChunkedStream<R>
1122where
1123    R: AsyncRead + Unpin + Send + Sync + 'static,
1124{
1125    type Item = Result<Vec<u8>, RequestBodyStreamError>;
1126
1127    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1128        // Check if complete or error
1129        if self.state == AsyncChunkedState::Complete || self.state == AsyncChunkedState::Error {
1130            return Poll::Ready(None);
1131        }
1132
1133        loop {
1134            match self.state {
1135                AsyncChunkedState::ChunkSize => {
1136                    // Try to find chunk size line in buffer
1137                    let remaining = self.buffer_remaining();
1138                    if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
1139                        // Parse chunk size
1140                        let size_line = &remaining[..crlf_pos];
1141
1142                        // Parse hex size (ignore extensions after semicolon)
1143                        let size_str = if let Some(semi) = size_line.iter().position(|&b| b == b';')
1144                        {
1145                            &size_line[..semi]
1146                        } else {
1147                            size_line
1148                        };
1149
1150                        let size_str = match std::str::from_utf8(size_str) {
1151                            Ok(s) => s.trim(),
1152                            Err(_) => {
1153                                self.state = AsyncChunkedState::Error;
1154                                return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1155                                    "invalid UTF-8 in chunk size".to_string(),
1156                                ))));
1157                            }
1158                        };
1159
1160                        let chunk_size = match usize::from_str_radix(size_str, 16) {
1161                            Ok(s) => s,
1162                            Err(_) => {
1163                                self.state = AsyncChunkedState::Error;
1164                                return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1165                                    "invalid hex chunk size".to_string(),
1166                                ))));
1167                            }
1168                        };
1169
1170                        // Enforce max size before consuming/streaming this chunk.
1171                        if chunk_size > 0
1172                            && self.bytes_decoded.saturating_add(chunk_size) > self.max_size
1173                        {
1174                            self.state = AsyncChunkedState::Error;
1175                            let bytes_decoded = self.bytes_decoded;
1176                            let max_size = self.max_size;
1177                            return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
1178                                received: bytes_decoded,
1179                                max: max_size,
1180                            })));
1181                        }
1182
1183                        // Reject unreasonably large chunk sizes early
1184                        const MAX_SINGLE_CHUNK: usize = 16 * 1024 * 1024;
1185                        if chunk_size > MAX_SINGLE_CHUNK {
1186                            self.state = AsyncChunkedState::Error;
1187                            return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1188                                "chunk size exceeds 16MB limit".to_string(),
1189                            ))));
1190                        }
1191
1192                        self.consume(crlf_pos + 2);
1193
1194                        if chunk_size == 0 {
1195                            // Final chunk - transition to trailers (includes required CRLF)
1196                            self.state = AsyncChunkedState::Trailers;
1197                            continue;
1198                        }
1199
1200                        self.state = AsyncChunkedState::ChunkData {
1201                            remaining: chunk_size,
1202                        };
1203                        continue;
1204                    }
1205
1206                    // Need more data from the reader (socket).
1207                    //
1208                    // Defensive cap: reject absurdly long chunk size lines without CRLF.
1209                    const MAX_CHUNK_SIZE_LINE: usize = 1024;
1210                    if remaining.len() > MAX_CHUNK_SIZE_LINE {
1211                        self.state = AsyncChunkedState::Error;
1212                        return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1213                            "chunk size line too long".to_string(),
1214                        ))));
1215                    }
1216
1217                    // Read minimally to avoid consuming bytes beyond request boundaries
1218                    // on keep-alive connections.
1219                    match self.poll_read_more_sized(cx, 1) {
1220                        Poll::Ready(Ok(_n)) => {}
1221                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1222                        Poll::Pending => return Poll::Pending,
1223                    }
1224                }
1225                AsyncChunkedState::ChunkData { remaining } => {
1226                    // Ensure we never yield bytes beyond max_size.
1227                    if remaining > 0 && self.bytes_decoded >= self.max_size {
1228                        self.state = AsyncChunkedState::Error;
1229                        let bytes_decoded = self.bytes_decoded;
1230                        let max_size = self.max_size;
1231                        return Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge {
1232                            received: bytes_decoded,
1233                            max: max_size,
1234                        })));
1235                    }
1236
1237                    // Read chunk data from buffer
1238                    let buffer_remaining = self.buffer_remaining();
1239                    let to_read = remaining.min(buffer_remaining.len()).min(self.chunk_size);
1240
1241                    if to_read > 0 {
1242                        let chunk = buffer_remaining[..to_read].to_vec();
1243                        self.consume(to_read);
1244                        self.bytes_decoded += to_read;
1245
1246                        let new_remaining = remaining - to_read;
1247                        if new_remaining == 0 {
1248                            self.state = AsyncChunkedState::ChunkDataEnd;
1249                        } else {
1250                            self.state = AsyncChunkedState::ChunkData {
1251                                remaining: new_remaining,
1252                            };
1253                        }
1254
1255                        return Poll::Ready(Some(Ok(chunk)));
1256                    }
1257
1258                    // Need more data from the reader (socket).
1259                    // Read at most the remaining bytes in this chunk to avoid read-ahead.
1260                    let want = remaining.min(self.chunk_size).max(1);
1261                    match self.poll_read_more_sized(cx, want) {
1262                        Poll::Ready(Ok(_n)) => {}
1263                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1264                        Poll::Pending => return Poll::Pending,
1265                    }
1266                }
1267                AsyncChunkedState::ChunkDataEnd => {
1268                    // Expect CRLF
1269                    let remaining = self.buffer_remaining();
1270                    if remaining.len() >= 2 {
1271                        if &remaining[..2] == b"\r\n" {
1272                            self.consume(2);
1273                            self.state = AsyncChunkedState::ChunkSize;
1274                            continue;
1275                        }
1276                        self.state = AsyncChunkedState::Error;
1277                        return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1278                            "expected CRLF after chunk data".to_string(),
1279                        ))));
1280                    }
1281
1282                    // Need more data from the reader (socket).
1283                    match self.poll_read_more_sized(cx, 1) {
1284                        Poll::Ready(Ok(_n)) => {}
1285                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1286                        Poll::Pending => return Poll::Pending,
1287                    }
1288                }
1289                AsyncChunkedState::Trailers => {
1290                    // Skip trailers until empty line (CRLF). Trailers are not exposed to the app.
1291                    //
1292                    // Read minimally to avoid swallowing bytes that belong to the next request
1293                    // on keep-alive connections.
1294                    let remaining = self.buffer_remaining();
1295
1296                    if remaining.len() >= 2 && &remaining[..2] == b"\r\n" {
1297                        self.consume(2);
1298                        self.state = AsyncChunkedState::Complete;
1299                        return Poll::Ready(None);
1300                    }
1301
1302                    // Defensive cap: trailer lines must be reasonably bounded.
1303                    const MAX_TRAILER_LINE: usize = 8 * 1024;
1304                    if remaining.len() > MAX_TRAILER_LINE {
1305                        self.state = AsyncChunkedState::Error;
1306                        return Poll::Ready(Some(Err(RequestBodyStreamError::Io(
1307                            "trailer line too long".to_string(),
1308                        ))));
1309                    }
1310
1311                    if let Some(crlf_pos) = remaining.windows(2).position(|w| w == b"\r\n") {
1312                        // Skip one trailer line (header) and continue.
1313                        self.consume(crlf_pos + 2);
1314                        continue;
1315                    }
1316
1317                    match self.poll_read_more_sized(cx, 1) {
1318                        Poll::Ready(Ok(_n)) => {}
1319                        Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
1320                        Poll::Pending => return Poll::Pending,
1321                    }
1322                }
1323                AsyncChunkedState::Complete | AsyncChunkedState::Error => {
1324                    return Poll::Ready(None);
1325                }
1326            }
1327        }
1328    }
1329}
1330
1331/// Create a streaming body from a Content-Length body.
1332///
1333/// Returns a `fastapi_core::Body::Stream` that yields chunks from the given
1334/// initial buffer and async reader.
1335///
1336/// # Arguments
1337///
1338/// * `initial_buffer` - Any bytes already buffered by the parser
1339/// * `reader` - The async reader for remaining bytes
1340/// * `content_length` - Expected total body size
1341/// * `config` - Streaming configuration
1342pub fn create_content_length_stream<R>(
1343    initial_buffer: Vec<u8>,
1344    reader: R,
1345    content_length: usize,
1346    config: &StreamingBodyConfig,
1347) -> fastapi_core::Body
1348where
1349    R: AsyncRead + Unpin + Send + Sync + 'static,
1350{
1351    let stream = AsyncContentLengthStream::new(initial_buffer, reader, content_length, config);
1352    fastapi_core::Body::streaming_with_size(stream, content_length)
1353}
1354
1355/// Create a streaming body from a chunked transfer-encoded body.
1356///
1357/// Returns a `fastapi_core::Body::Stream` that yields decoded chunks.
1358///
1359/// # Arguments
1360///
1361/// * `initial_buffer` - Any bytes already buffered by the parser
1362/// * `reader` - The async reader for remaining bytes
1363/// * `config` - Streaming configuration
1364pub fn create_chunked_stream<R>(
1365    initial_buffer: Vec<u8>,
1366    reader: R,
1367    config: &StreamingBodyConfig,
1368) -> fastapi_core::Body
1369where
1370    R: AsyncRead + Unpin + Send + Sync + 'static,
1371{
1372    let stream = AsyncChunkedStream::new(initial_buffer, reader, config);
1373    fastapi_core::Body::streaming(stream)
1374}
1375
1376// ============================================================================
1377// Tests
1378// ============================================================================
1379
1380#[cfg(test)]
1381mod tests {
1382    use super::*;
1383
1384    // ========================================================================
1385    // BodyConfig Tests
1386    // ========================================================================
1387
1388    #[test]
1389    fn body_config_defaults() {
1390        let config = BodyConfig::default();
1391        assert_eq!(config.max_size(), DEFAULT_MAX_BODY_SIZE);
1392        assert_eq!(config.initial_capacity(), 4096);
1393    }
1394
1395    #[test]
1396    fn body_config_custom() {
1397        let config = BodyConfig::new()
1398            .with_max_size(2048)
1399            .with_initial_capacity(1024);
1400        assert_eq!(config.max_size(), 2048);
1401        assert_eq!(config.initial_capacity(), 1024);
1402    }
1403
1404    // ========================================================================
1405    // Content-Length Reader Tests
1406    // ========================================================================
1407
1408    #[test]
1409    fn content_length_basic() {
1410        let body = b"Hello, World!";
1411        let config = BodyConfig::default();
1412        let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1413
1414        assert_eq!(reader.length(), 13);
1415        assert_eq!(reader.remaining(), 13);
1416        assert!(!reader.is_complete());
1417
1418        let result = reader.read_all().unwrap();
1419        assert_eq!(result, b"Hello, World!");
1420        assert!(reader.is_complete());
1421    }
1422
1423    #[test]
1424    fn content_length_zero() {
1425        let body = b"";
1426        let config = BodyConfig::default();
1427        let mut reader = ContentLengthReader::new(body, 0, &config).unwrap();
1428
1429        assert_eq!(reader.length(), 0);
1430        assert!(reader.is_complete());
1431
1432        let result = reader.read_all().unwrap();
1433        assert!(result.is_empty());
1434    }
1435
1436    #[test]
1437    fn content_length_too_large() {
1438        let body = b"small";
1439        let config = BodyConfig::new().with_max_size(3);
1440        let result = ContentLengthReader::new(body, 100, &config);
1441
1442        assert!(matches!(
1443            result,
1444            Err(BodyError::TooLarge { size: 100, max: 3 })
1445        ));
1446    }
1447
1448    #[test]
1449    fn content_length_incomplete() {
1450        let body = b"Hello";
1451        let config = BodyConfig::default();
1452        let mut reader = ContentLengthReader::new(body, 10, &config).unwrap();
1453
1454        let result = reader.read_all();
1455        assert!(matches!(
1456            result,
1457            Err(BodyError::Incomplete {
1458                received: 5,
1459                expected: Some(10)
1460            })
1461        ));
1462    }
1463
1464    #[test]
1465    fn content_length_borrowed() {
1466        let body = b"Hello, World!";
1467        let config = BodyConfig::default();
1468        let reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1469
1470        let borrowed = reader.read_all_borrowed().unwrap();
1471        assert_eq!(borrowed, body);
1472        // Verify it's the same memory location (zero-copy)
1473        assert_eq!(borrowed.as_ptr(), body.as_ptr());
1474    }
1475
1476    #[test]
1477    fn content_length_incremental_read() {
1478        let body = b"Hello, World!";
1479        let config = BodyConfig::default();
1480        let mut reader = ContentLengthReader::new(body, body.len(), &config).unwrap();
1481
1482        let mut buf = [0u8; 5];
1483
1484        // First read
1485        let n = reader.read(&mut buf).unwrap();
1486        assert_eq!(n, 5);
1487        assert_eq!(&buf[..n], b"Hello");
1488        assert_eq!(reader.remaining(), 8);
1489
1490        // Second read
1491        let n = reader.read(&mut buf).unwrap();
1492        assert_eq!(n, 5);
1493        assert_eq!(&buf[..n], b", Wor");
1494        assert_eq!(reader.remaining(), 3);
1495
1496        // Third read
1497        let n = reader.read(&mut buf).unwrap();
1498        assert_eq!(n, 3);
1499        assert_eq!(&buf[..n], b"ld!");
1500        assert!(reader.is_complete());
1501
1502        // No more data
1503        let n = reader.read(&mut buf).unwrap();
1504        assert_eq!(n, 0);
1505    }
1506
1507    // ========================================================================
1508    // Chunked Encoding Tests
1509    // ========================================================================
1510
1511    #[test]
1512    fn chunked_single_chunk() {
1513        let body = b"5\r\nHello\r\n0\r\n\r\n";
1514        let config = BodyConfig::default();
1515        let mut reader = ChunkedReader::new(body, &config);
1516
1517        let result = reader.decode_all().unwrap();
1518        assert_eq!(result, b"Hello");
1519        assert!(reader.is_complete());
1520    }
1521
1522    #[test]
1523    fn chunked_multiple_chunks() {
1524        let body = b"5\r\nHello\r\n7\r\n, World\r\n1\r\n!\r\n0\r\n\r\n";
1525        let config = BodyConfig::default();
1526        let mut reader = ChunkedReader::new(body, &config);
1527
1528        let result = reader.decode_all().unwrap();
1529        assert_eq!(result, b"Hello, World!");
1530        assert!(reader.is_complete());
1531    }
1532
1533    #[test]
1534    fn chunked_empty() {
1535        let body = b"0\r\n\r\n";
1536        let config = BodyConfig::default();
1537        let mut reader = ChunkedReader::new(body, &config);
1538
1539        let result = reader.decode_all().unwrap();
1540        assert!(result.is_empty());
1541        assert!(reader.is_complete());
1542    }
1543
1544    #[test]
1545    fn chunked_with_extension() {
1546        // Chunk extensions should be ignored
1547        let body = b"5;ext=value\r\nHello\r\n0\r\n\r\n";
1548        let config = BodyConfig::default();
1549        let mut reader = ChunkedReader::new(body, &config);
1550
1551        let result = reader.decode_all().unwrap();
1552        assert_eq!(result, b"Hello");
1553    }
1554
1555    #[test]
1556    fn chunked_with_trailers() {
1557        let body = b"5\r\nHello\r\n0\r\nTrailer: value\r\n\r\n";
1558        let config = BodyConfig::default();
1559        let mut reader = ChunkedReader::new(body, &config);
1560
1561        let result = reader.decode_all().unwrap();
1562        assert_eq!(result, b"Hello");
1563        assert!(reader.is_complete());
1564    }
1565
1566    #[test]
1567    fn chunked_hex_sizes() {
1568        // Test various hex chunk sizes
1569        let body = b"a\r\n0123456789\r\nF\r\n0123456789ABCDE\r\n0\r\n\r\n";
1570        let config = BodyConfig::default();
1571        let mut reader = ChunkedReader::new(body, &config);
1572
1573        let result = reader.decode_all().unwrap();
1574        assert_eq!(result.len(), 10 + 15); // a=10, F=15
1575    }
1576
1577    #[test]
1578    fn chunked_too_large() {
1579        let body = b"10\r\n0123456789ABCDEF\r\n0\r\n\r\n"; // 16 bytes
1580        let config = BodyConfig::new().with_max_size(10);
1581        let mut reader = ChunkedReader::new(body, &config);
1582
1583        let result = reader.decode_all();
1584        assert!(matches!(
1585            result,
1586            Err(BodyError::TooLarge { size: 16, max: 10 })
1587        ));
1588    }
1589
1590    #[test]
1591    fn chunked_invalid_size() {
1592        let body = b"xyz\r\nHello\r\n0\r\n\r\n";
1593        let config = BodyConfig::default();
1594        let mut reader = ChunkedReader::new(body, &config);
1595
1596        let result = reader.decode_all();
1597        assert!(matches!(
1598            result,
1599            Err(BodyError::InvalidChunkedEncoding { detail: _ })
1600        ));
1601    }
1602
1603    #[test]
1604    fn chunked_missing_crlf() {
1605        let body = b"5\r\nHelloX0\r\n\r\n"; // Missing CRLF after data
1606        let config = BodyConfig::default();
1607        let mut reader = ChunkedReader::new(body, &config);
1608
1609        let result = reader.decode_all();
1610        assert!(matches!(
1611            result,
1612            Err(BodyError::InvalidChunkedEncoding {
1613                detail: "expected CRLF after chunk data"
1614            })
1615        ));
1616    }
1617
1618    #[test]
1619    fn chunked_incomplete() {
1620        let body = b"5\r\nHel"; // Incomplete chunk data
1621        let config = BodyConfig::default();
1622        let mut reader = ChunkedReader::new(body, &config);
1623
1624        let result = reader.decode_all();
1625        assert!(matches!(result, Err(BodyError::Incomplete { .. })));
1626    }
1627
1628    // ========================================================================
1629    // parse_body Tests
1630    // ========================================================================
1631
1632    #[test]
1633    fn parse_body_none() {
1634        let config = BodyConfig::default();
1635        let result = parse_body(b"ignored", BodyLength::None, &config).unwrap();
1636        assert!(result.is_none());
1637    }
1638
1639    #[test]
1640    fn parse_body_content_length() {
1641        let config = BodyConfig::default();
1642        let result = parse_body(b"Hello, World!", BodyLength::ContentLength(13), &config).unwrap();
1643        assert_eq!(result.unwrap(), b"Hello, World!");
1644    }
1645
1646    #[test]
1647    fn parse_body_content_length_zero() {
1648        let config = BodyConfig::default();
1649        let result = parse_body(b"", BodyLength::ContentLength(0), &config).unwrap();
1650        assert_eq!(result.unwrap(), b"");
1651    }
1652
1653    #[test]
1654    fn parse_body_chunked() {
1655        let config = BodyConfig::default();
1656        let result = parse_body(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config).unwrap();
1657        assert_eq!(result.unwrap(), b"Hello");
1658    }
1659
1660    #[test]
1661    fn parse_body_with_consumed_content_length() {
1662        let config = BodyConfig::default();
1663        let (body, consumed) =
1664            parse_body_with_consumed(b"Hello, World!", BodyLength::ContentLength(13), &config)
1665                .unwrap();
1666        assert_eq!(body.unwrap(), b"Hello, World!");
1667        assert_eq!(consumed, 13);
1668    }
1669
1670    #[test]
1671    fn parse_body_with_consumed_chunked() {
1672        let config = BodyConfig::default();
1673        let (body, consumed) =
1674            parse_body_with_consumed(b"5\r\nHello\r\n0\r\n\r\n", BodyLength::Chunked, &config)
1675                .unwrap();
1676        assert_eq!(body.unwrap(), b"Hello");
1677        assert_eq!(consumed, 15);
1678    }
1679
1680    // ========================================================================
1681    // validate_content_length Tests
1682    // ========================================================================
1683
1684    #[test]
1685    fn validate_content_length_ok() {
1686        let config = BodyConfig::new().with_max_size(1000);
1687        assert!(validate_content_length(500, &config).is_ok());
1688        assert!(validate_content_length(1000, &config).is_ok());
1689    }
1690
1691    #[test]
1692    fn validate_content_length_too_large() {
1693        let config = BodyConfig::new().with_max_size(1000);
1694        let result = validate_content_length(1001, &config);
1695        assert!(matches!(
1696            result,
1697            Err(BodyError::TooLarge {
1698                size: 1001,
1699                max: 1000
1700            })
1701        ));
1702    }
1703
1704    // ========================================================================
1705    // BodyError Tests
1706    // ========================================================================
1707
1708    #[test]
1709    fn body_error_display() {
1710        let err = BodyError::TooLarge {
1711            size: 2000,
1712            max: 1000,
1713        };
1714        assert_eq!(
1715            format!("{err}"),
1716            "body too large: 2000 bytes exceeds limit of 1000"
1717        );
1718
1719        let err = BodyError::InvalidChunkedEncoding {
1720            detail: "bad format",
1721        };
1722        assert_eq!(format!("{err}"), "invalid chunked encoding: bad format");
1723
1724        let err = BodyError::Incomplete {
1725            received: 50,
1726            expected: Some(100),
1727        };
1728        assert_eq!(
1729            format!("{err}"),
1730            "incomplete body: received 50 of 100 bytes"
1731        );
1732
1733        let err = BodyError::UnexpectedEof;
1734        assert_eq!(format!("{err}"), "unexpected end of body");
1735    }
1736
1737    // ========================================================================
1738    // StreamingBodyConfig Tests
1739    // ========================================================================
1740
1741    #[test]
1742    fn streaming_body_config_defaults() {
1743        let config = StreamingBodyConfig::default();
1744        assert_eq!(config.streaming_threshold, DEFAULT_STREAMING_THRESHOLD);
1745        assert_eq!(config.chunk_size, 8 * 1024);
1746        assert_eq!(config.max_size, DEFAULT_MAX_BODY_SIZE);
1747    }
1748
1749    #[test]
1750    fn streaming_body_config_custom() {
1751        let config = StreamingBodyConfig::new()
1752            .with_streaming_threshold(1024)
1753            .with_chunk_size(4096)
1754            .with_max_size(10_000);
1755        assert_eq!(config.streaming_threshold, 1024);
1756        assert_eq!(config.chunk_size, 4096);
1757        assert_eq!(config.max_size, 10_000);
1758    }
1759
1760    #[test]
1761    fn streaming_body_config_minimum_chunk_size() {
1762        let config = StreamingBodyConfig::new().with_chunk_size(0);
1763        // Should be clamped to minimum of 1 byte
1764        assert_eq!(config.chunk_size, 1);
1765    }
1766
1767    #[test]
1768    fn streaming_body_config_should_stream() {
1769        let config = StreamingBodyConfig::new().with_streaming_threshold(1000);
1770        assert!(!config.should_stream(500));
1771        assert!(!config.should_stream(1000));
1772        assert!(config.should_stream(1001));
1773        assert!(config.should_stream(10000));
1774    }
1775
1776    // ========================================================================
1777    // AsyncContentLengthStream Tests
1778    // ========================================================================
1779
1780    #[test]
1781    fn async_content_length_stream_from_buffer() {
1782        use std::sync::Arc;
1783        use std::task::{Wake, Waker};
1784
1785        struct NoopWaker;
1786        impl Wake for NoopWaker {
1787            fn wake(self: Arc<Self>) {}
1788        }
1789
1790        fn noop_waker() -> Waker {
1791            Waker::from(Arc::new(NoopWaker))
1792        }
1793
1794        // Create a mock reader that won't be used (buffer is complete)
1795        struct EmptyReader;
1796        impl AsyncRead for EmptyReader {
1797            fn poll_read(
1798                self: Pin<&mut Self>,
1799                _cx: &mut Context<'_>,
1800                _buf: &mut asupersync::io::ReadBuf<'_>,
1801            ) -> Poll<std::io::Result<()>> {
1802                Poll::Ready(Ok(()))
1803            }
1804        }
1805
1806        let buffer = b"Hello, World!".to_vec();
1807        let config = StreamingBodyConfig::new().with_chunk_size(5);
1808        let mut stream = AsyncContentLengthStream::new(buffer, EmptyReader, 13, &config);
1809
1810        assert_eq!(stream.expected_size(), 13);
1811        assert_eq!(stream.bytes_read(), 0);
1812        assert_eq!(stream.remaining(), 13);
1813
1814        let waker = noop_waker();
1815        let mut cx = Context::from_waker(&waker);
1816
1817        // First chunk: "Hello"
1818        let result = Pin::new(&mut stream).poll_next(&mut cx);
1819        match result {
1820            Poll::Ready(Some(Ok(chunk))) => {
1821                assert_eq!(chunk, b"Hello");
1822            }
1823            _ => panic!("expected chunk"),
1824        }
1825        assert_eq!(stream.bytes_read(), 5);
1826
1827        // Second chunk: ", Wor"
1828        let result = Pin::new(&mut stream).poll_next(&mut cx);
1829        match result {
1830            Poll::Ready(Some(Ok(chunk))) => {
1831                assert_eq!(chunk, b", Wor");
1832            }
1833            _ => panic!("expected chunk"),
1834        }
1835
1836        // Third chunk: "ld!"
1837        let result = Pin::new(&mut stream).poll_next(&mut cx);
1838        match result {
1839            Poll::Ready(Some(Ok(chunk))) => {
1840                assert_eq!(chunk, b"ld!");
1841            }
1842            _ => panic!("expected chunk"),
1843        }
1844
1845        // End of stream
1846        let result = Pin::new(&mut stream).poll_next(&mut cx);
1847        assert!(matches!(result, Poll::Ready(None)));
1848        assert!(stream.is_complete());
1849    }
1850
1851    #[test]
1852    fn async_content_length_stream_enforces_max_size() {
1853        use std::io::Cursor;
1854        use std::sync::Arc;
1855        use std::task::{Wake, Waker};
1856
1857        struct NoopWaker;
1858        impl Wake for NoopWaker {
1859            fn wake(self: Arc<Self>) {}
1860        }
1861
1862        fn noop_waker() -> Waker {
1863            Waker::from(Arc::new(NoopWaker))
1864        }
1865
1866        let initial = b"123456".to_vec();
1867        let reader = Cursor::new(b"abcdef".to_vec());
1868        let config = StreamingBodyConfig::new()
1869            .with_chunk_size(8)
1870            .with_max_size(10);
1871        let mut stream = AsyncContentLengthStream::new(initial, reader, 12, &config);
1872
1873        let waker = noop_waker();
1874        let mut cx = Context::from_waker(&waker);
1875
1876        // First 6 bytes come from initial buffer.
1877        let result = Pin::new(&mut stream).poll_next(&mut cx);
1878        match result {
1879            Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, b"123456"),
1880            _ => panic!("expected initial chunk"),
1881        }
1882
1883        // Stream can still emit bytes up to max_size.
1884        let result = Pin::new(&mut stream).poll_next(&mut cx);
1885        match result {
1886            Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, b"abcd"),
1887            _ => panic!("expected bounded reader chunk"),
1888        }
1889
1890        // Next poll must fail because body is larger than max_size.
1891        let result = Pin::new(&mut stream).poll_next(&mut cx);
1892        match result {
1893            Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge { received, max }))) => {
1894                assert_eq!(received, 11);
1895                assert_eq!(max, 10);
1896            }
1897            _ => panic!("expected TooLarge error, got {:?}", result),
1898        }
1899    }
1900
1901    // ========================================================================
1902    // AsyncChunkedStream Tests
1903    // ========================================================================
1904
1905    #[test]
1906    fn async_chunked_stream_simple() {
1907        use std::sync::Arc;
1908        use std::task::{Wake, Waker};
1909
1910        struct NoopWaker;
1911        impl Wake for NoopWaker {
1912            fn wake(self: Arc<Self>) {}
1913        }
1914
1915        fn noop_waker() -> Waker {
1916            Waker::from(Arc::new(NoopWaker))
1917        }
1918
1919        struct EmptyReader;
1920        impl AsyncRead for EmptyReader {
1921            fn poll_read(
1922                self: Pin<&mut Self>,
1923                _cx: &mut Context<'_>,
1924                _buf: &mut asupersync::io::ReadBuf<'_>,
1925            ) -> Poll<std::io::Result<()>> {
1926                Poll::Ready(Ok(()))
1927            }
1928        }
1929
1930        // Complete chunked body in buffer: "Hello" in chunked encoding
1931        let buffer = b"5\r\nHello\r\n0\r\n\r\n".to_vec();
1932        let config = StreamingBodyConfig::new().with_chunk_size(1024);
1933        let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1934
1935        let waker = noop_waker();
1936        let mut cx = Context::from_waker(&waker);
1937
1938        // First chunk: "Hello"
1939        let result = Pin::new(&mut stream).poll_next(&mut cx);
1940        match result {
1941            Poll::Ready(Some(Ok(chunk))) => {
1942                assert_eq!(chunk, b"Hello");
1943            }
1944            _ => panic!("expected chunk, got {:?}", result),
1945        }
1946
1947        // Note: Need to poll again to process CRLF and next chunk size
1948        // The implementation returns Pending after transition, then processes on next poll
1949        let result = Pin::new(&mut stream).poll_next(&mut cx);
1950        // Should be complete (0\r\n\r\n)
1951        assert!(matches!(result, Poll::Ready(None)));
1952        assert!(stream.is_complete());
1953    }
1954
1955    #[test]
1956    fn async_chunked_stream_multiple_chunks() {
1957        use std::sync::Arc;
1958        use std::task::{Wake, Waker};
1959
1960        struct NoopWaker;
1961        impl Wake for NoopWaker {
1962            fn wake(self: Arc<Self>) {}
1963        }
1964
1965        fn noop_waker() -> Waker {
1966            Waker::from(Arc::new(NoopWaker))
1967        }
1968
1969        struct EmptyReader;
1970        impl AsyncRead for EmptyReader {
1971            fn poll_read(
1972                self: Pin<&mut Self>,
1973                _cx: &mut Context<'_>,
1974                _buf: &mut asupersync::io::ReadBuf<'_>,
1975            ) -> Poll<std::io::Result<()>> {
1976                Poll::Ready(Ok(()))
1977            }
1978        }
1979
1980        // "Hello, World!" in chunked encoding
1981        let buffer = b"5\r\nHello\r\n8\r\n, World!\r\n0\r\n\r\n".to_vec();
1982        let config = StreamingBodyConfig::new();
1983        let mut stream = AsyncChunkedStream::new(buffer, EmptyReader, &config);
1984
1985        let waker = noop_waker();
1986        let mut cx = Context::from_waker(&waker);
1987
1988        // Collect all chunks
1989        let mut collected = Vec::new();
1990        loop {
1991            match Pin::new(&mut stream).poll_next(&mut cx) {
1992                Poll::Ready(Some(Ok(chunk))) => collected.extend_from_slice(&chunk),
1993                Poll::Ready(Some(Err(e))) => panic!("unexpected error: {e}"),
1994                Poll::Ready(None) => break,
1995                Poll::Pending => {} // Continue processing
1996            }
1997        }
1998
1999        assert_eq!(collected, b"Hello, World!");
2000    }
2001}