Skip to main content

fastapi_core/
request.rs

1//! HTTP request types.
2
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::fmt;
6
7/// HTTP version.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
9pub enum HttpVersion {
10    /// HTTP/1.0
11    Http10,
12    /// HTTP/1.1 (default)
13    #[default]
14    Http11,
15}
16
17impl HttpVersion {
18    /// Parse HTTP version from string.
19    #[must_use]
20    pub fn parse(s: &str) -> Option<Self> {
21        match s {
22            "HTTP/1.0" => Some(Self::Http10),
23            "HTTP/1.1" => Some(Self::Http11),
24            _ => None,
25        }
26    }
27
28    /// Returns true if this is HTTP/1.1.
29    #[must_use]
30    pub fn is_http11(self) -> bool {
31        matches!(self, Self::Http11)
32    }
33
34    /// Returns true if this is HTTP/1.0.
35    #[must_use]
36    pub fn is_http10(self) -> bool {
37        matches!(self, Self::Http10)
38    }
39
40    /// Returns the version string.
41    #[must_use]
42    pub const fn as_str(self) -> &'static str {
43        match self {
44            Self::Http10 => "HTTP/1.0",
45            Self::Http11 => "HTTP/1.1",
46        }
47    }
48}
49
50impl std::str::FromStr for HttpVersion {
51    type Err = ();
52
53    fn from_str(s: &str) -> Result<Self, Self::Err> {
54        Self::parse(s).ok_or(())
55    }
56}
57
58impl fmt::Display for HttpVersion {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.write_str(self.as_str())
61    }
62}
63
64/// HTTP method.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
66pub enum Method {
67    /// GET method.
68    Get,
69    /// POST method.
70    Post,
71    /// PUT method.
72    Put,
73    /// DELETE method.
74    Delete,
75    /// PATCH method.
76    Patch,
77    /// OPTIONS method.
78    Options,
79    /// HEAD method.
80    Head,
81    /// TRACE method.
82    Trace,
83}
84
85impl Method {
86    /// Parse method from bytes.
87    #[must_use]
88    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
89        match bytes {
90            b"GET" => Some(Self::Get),
91            b"POST" => Some(Self::Post),
92            b"PUT" => Some(Self::Put),
93            b"DELETE" => Some(Self::Delete),
94            b"PATCH" => Some(Self::Patch),
95            b"OPTIONS" => Some(Self::Options),
96            b"HEAD" => Some(Self::Head),
97            b"TRACE" => Some(Self::Trace),
98            _ => None,
99        }
100    }
101
102    /// Return the canonical uppercase method name.
103    #[must_use]
104    pub const fn as_str(self) -> &'static str {
105        match self {
106            Self::Get => "GET",
107            Self::Post => "POST",
108            Self::Put => "PUT",
109            Self::Delete => "DELETE",
110            Self::Patch => "PATCH",
111            Self::Options => "OPTIONS",
112            Self::Head => "HEAD",
113            Self::Trace => "TRACE",
114        }
115    }
116}
117
118impl fmt::Display for Method {
119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120        f.write_str(self.as_str())
121    }
122}
123
124/// HTTP headers collection.
125///
126/// Header names are normalized to lowercase at insertion time for case-insensitive
127/// matching. Lookups avoid allocation when the lookup key is already lowercase.
128#[derive(Debug, Default)]
129pub struct Headers {
130    inner: HashMap<String, Vec<u8>>,
131}
132
133impl Headers {
134    /// Create empty headers.
135    #[must_use]
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Get a header value by name (case-insensitive).
141    ///
142    /// Avoids heap allocation when the lookup key is already lowercase.
143    #[must_use]
144    pub fn get(&self, name: &str) -> Option<&[u8]> {
145        self.inner
146            .get(lowercase_header_key(name).as_ref())
147            .map(Vec::as_slice)
148    }
149
150    /// Insert a header.
151    ///
152    /// The header name is normalized to lowercase.
153    pub fn insert(&mut self, name: impl Into<String>, value: impl Into<Vec<u8>>) {
154        self.inner
155            .insert(name.into().to_ascii_lowercase(), value.into());
156    }
157
158    /// Insert a header from borrowed slices with minimal allocation.
159    ///
160    /// This is an optimized fast path for parsing that:
161    /// - Avoids double allocation for header names
162    /// - Lowercases in a single pass when needed
163    /// - Only allocates for the value copy
164    #[inline]
165    pub fn insert_from_slice(&mut self, name: &str, value: &[u8]) {
166        // Check if name needs lowercasing (avoiding double allocation)
167        let name_owned = if name.bytes().any(|b| b.is_ascii_uppercase()) {
168            // Need to lowercase - single allocation with transformation
169            name.to_ascii_lowercase()
170        } else {
171            // Already lowercase - single allocation, no transformation
172            name.to_owned()
173        };
174        self.inner.insert(name_owned, value.to_vec());
175    }
176
177    /// Insert a header with an already-lowercase name.
178    ///
179    /// # Safety Note
180    ///
181    /// This method assumes the name is already lowercase. If it contains
182    /// uppercase characters, lookups may fail. Use `insert` or
183    /// `insert_from_slice` for untrusted input.
184    #[inline]
185    pub fn insert_lowercase(&mut self, name: String, value: Vec<u8>) {
186        debug_assert!(
187            !name.bytes().any(|b| b.is_ascii_uppercase()),
188            "insert_lowercase called with non-lowercase name: {}",
189            name
190        );
191        self.inner.insert(name, value);
192    }
193
194    /// Iterate over all headers as (name, value) pairs.
195    pub fn iter(&self) -> impl Iterator<Item = (&str, &[u8])> {
196        self.inner
197            .iter()
198            .map(|(name, value)| (name.as_str(), value.as_slice()))
199    }
200
201    /// Returns the number of headers.
202    #[must_use]
203    pub fn len(&self) -> usize {
204        self.inner.len()
205    }
206
207    /// Returns true if there are no headers.
208    #[must_use]
209    pub fn is_empty(&self) -> bool {
210        self.inner.is_empty()
211    }
212
213    /// Remove a header by name (case-insensitive).
214    ///
215    /// Returns the removed value, if any.
216    pub fn remove(&mut self, name: &str) -> Option<Vec<u8>> {
217        self.inner.remove(lowercase_header_key(name).as_ref())
218    }
219
220    /// Check if a header exists (case-insensitive).
221    #[must_use]
222    pub fn contains(&self, name: &str) -> bool {
223        self.inner.contains_key(lowercase_header_key(name).as_ref())
224    }
225}
226
227/// Lowercase a header name for lookup.
228///
229/// Returns a `Cow<str>` that is:
230/// - **Borrowed** if the name is already lowercase (zero allocation)
231/// - **Owned** if uppercase characters need conversion
232///
233/// Since programmatic code typically uses lowercase header names like
234/// `"content-type"` rather than `"Content-Type"`, most lookups are zero-alloc.
235#[inline]
236fn lowercase_header_key(name: &str) -> std::borrow::Cow<'_, str> {
237    // Fast path: check if name is already ASCII lowercase.
238    // This covers the common case of programmatic access with lowercase literals.
239    let needs_lowercase = name.as_bytes().iter().any(|&b| b.is_ascii_uppercase());
240
241    if needs_lowercase {
242        std::borrow::Cow::Owned(name.to_ascii_lowercase())
243    } else {
244        std::borrow::Cow::Borrowed(name)
245    }
246}
247
248/// Request body.
249#[derive(Debug)]
250pub enum Body {
251    /// Empty body.
252    Empty,
253    /// Bytes body.
254    Bytes(Vec<u8>),
255    /// Streaming body for large uploads.
256    ///
257    /// This variant enables memory-efficient handling of large request bodies
258    /// by yielding chunks incrementally rather than buffering the entire content.
259    ///
260    /// The stream yields `Result<Vec<u8>, RequestBodyStreamError>` chunks.
261    Stream(RequestBodyStream),
262}
263
264/// Error type for streaming body operations.
265#[derive(Debug)]
266pub enum RequestBodyStreamError {
267    /// Connection was closed before body was complete.
268    ConnectionClosed,
269    /// Timeout while waiting for body data.
270    Timeout,
271    /// Body exceeded configured size limit.
272    TooLarge { received: usize, max: usize },
273    /// I/O error during streaming.
274    Io(String),
275}
276
277impl std::fmt::Display for RequestBodyStreamError {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        match self {
280            Self::ConnectionClosed => write!(f, "connection closed"),
281            Self::Timeout => write!(f, "timeout waiting for body data"),
282            Self::TooLarge { received, max } => {
283                write!(f, "body too large: {received} bytes exceeds limit of {max}")
284            }
285            Self::Io(msg) => write!(f, "I/O error: {msg}"),
286        }
287    }
288}
289
290impl std::error::Error for RequestBodyStreamError {}
291
292/// A streaming request body.
293///
294/// This provides an async interface for reading request body chunks
295/// without buffering the entire body in memory.
296///
297/// # Example
298///
299/// ```ignore
300/// use fastapi_core::{Body, RequestBodyStream};
301///
302/// async fn handle_upload(body: Body) -> Vec<u8> {
303///     match body {
304///         Body::Stream(mut stream) => {
305///             let mut buffer = Vec::new();
306///             while let Some(chunk) = stream.next().await {
307///                 buffer.extend_from_slice(&chunk?);
308///             }
309///             buffer
310///         }
311///         Body::Bytes(bytes) => bytes,
312///         Body::Empty => Vec::new(),
313///     }
314/// }
315/// ```
316pub struct RequestBodyStream {
317    /// The inner stream of chunks.
318    inner: std::pin::Pin<
319        Box<
320            dyn asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
321                + Send
322                + Sync,
323        >,
324    >,
325    /// Total bytes received so far.
326    bytes_received: usize,
327    /// Expected total size (from Content-Length), if known.
328    expected_size: Option<usize>,
329    /// Whether the stream is complete.
330    complete: bool,
331}
332
333impl std::fmt::Debug for RequestBodyStream {
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335        f.debug_struct("RequestBodyStream")
336            .field("bytes_received", &self.bytes_received)
337            .field("expected_size", &self.expected_size)
338            .field("complete", &self.complete)
339            .finish_non_exhaustive()
340    }
341}
342
343impl RequestBodyStream {
344    /// Create a new body stream from an async stream of chunks.
345    pub fn new<S>(stream: S) -> Self
346    where
347        S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
348            + Send
349            + Sync
350            + 'static,
351    {
352        Self {
353            inner: Box::pin(stream),
354            bytes_received: 0,
355            expected_size: None,
356            complete: false,
357        }
358    }
359
360    /// Create a body stream with a known expected size.
361    pub fn with_expected_size<S>(stream: S, expected_size: usize) -> Self
362    where
363        S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
364            + Send
365            + Sync
366            + 'static,
367    {
368        Self {
369            inner: Box::pin(stream),
370            bytes_received: 0,
371            expected_size: Some(expected_size),
372            complete: false,
373        }
374    }
375
376    /// Returns the number of bytes received so far.
377    #[must_use]
378    pub fn bytes_received(&self) -> usize {
379        self.bytes_received
380    }
381
382    /// Returns the expected total size, if known.
383    #[must_use]
384    pub fn expected_size(&self) -> Option<usize> {
385        self.expected_size
386    }
387
388    /// Returns true if the stream is complete.
389    #[must_use]
390    pub fn is_complete(&self) -> bool {
391        self.complete
392    }
393
394    /// Collect all chunks into a single buffer.
395    ///
396    /// This consumes the stream and buffers the entire body in memory.
397    /// Use this for small bodies or when the full content is needed.
398    ///
399    /// For large bodies, prefer processing chunks individually via `next()`.
400    pub async fn collect(mut self) -> Result<Vec<u8>, RequestBodyStreamError> {
401        use asupersync::stream::StreamExt;
402
403        let capacity = self.expected_size.unwrap_or(4096);
404        let mut buffer = Vec::with_capacity(capacity);
405
406        while let Some(chunk) = self.inner.next().await {
407            buffer.extend_from_slice(&chunk?);
408            self.bytes_received = buffer.len();
409        }
410
411        self.complete = true;
412        Ok(buffer)
413    }
414}
415
416impl asupersync::stream::Stream for RequestBodyStream {
417    type Item = Result<Vec<u8>, RequestBodyStreamError>;
418
419    fn poll_next(
420        mut self: std::pin::Pin<&mut Self>,
421        cx: &mut std::task::Context<'_>,
422    ) -> std::task::Poll<Option<Self::Item>> {
423        if self.complete {
424            return std::task::Poll::Ready(None);
425        }
426
427        match self.inner.as_mut().poll_next(cx) {
428            std::task::Poll::Ready(Some(Ok(chunk))) => {
429                self.bytes_received += chunk.len();
430                std::task::Poll::Ready(Some(Ok(chunk)))
431            }
432            std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
433            std::task::Poll::Ready(None) => {
434                self.complete = true;
435                std::task::Poll::Ready(None)
436            }
437            std::task::Poll::Pending => std::task::Poll::Pending,
438        }
439    }
440}
441
442impl Body {
443    /// Get body as bytes, consuming it.
444    ///
445    /// For `Body::Stream`, this will panic. Use `into_bytes_async()` instead
446    /// for streaming bodies, or check with `is_streaming()` first.
447    #[must_use]
448    pub fn into_bytes(self) -> Vec<u8> {
449        match self {
450            Self::Empty => Vec::new(),
451            Self::Bytes(b) => b,
452            Self::Stream(_) => panic!(
453                "cannot synchronously convert streaming body to bytes; use into_bytes_async()"
454            ),
455        }
456    }
457
458    /// Get body as bytes asynchronously, consuming it.
459    ///
460    /// This works for all body types:
461    /// - `Empty` returns an empty Vec
462    /// - `Bytes` returns the bytes
463    /// - `Stream` collects all chunks into a Vec
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the stream encounters an error while reading.
468    pub async fn into_bytes_async(self) -> Result<Vec<u8>, RequestBodyStreamError> {
469        match self {
470            Self::Empty => Ok(Vec::new()),
471            Self::Bytes(b) => Ok(b),
472            Self::Stream(stream) => stream.collect().await,
473        }
474    }
475
476    /// Check if body is empty.
477    #[must_use]
478    pub fn is_empty(&self) -> bool {
479        match self {
480            Self::Empty => true,
481            Self::Bytes(b) => b.is_empty(),
482            Self::Stream(s) => s.is_complete() && s.bytes_received() == 0,
483        }
484    }
485
486    /// Check if body is a streaming body.
487    #[must_use]
488    pub fn is_streaming(&self) -> bool {
489        matches!(self, Self::Stream(_))
490    }
491
492    /// Take the body stream, if this is a streaming body.
493    ///
494    /// Returns `None` for `Empty` and `Bytes` variants.
495    #[must_use]
496    pub fn take_stream(self) -> Option<RequestBodyStream> {
497        match self {
498            Self::Stream(s) => Some(s),
499            _ => None,
500        }
501    }
502
503    /// Create a streaming body.
504    pub fn streaming<S>(stream: S) -> Self
505    where
506        S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
507            + Send
508            + Sync
509            + 'static,
510    {
511        Self::Stream(RequestBodyStream::new(stream))
512    }
513
514    /// Create a streaming body with a known size.
515    pub fn streaming_with_size<S>(stream: S, size: usize) -> Self
516    where
517        S: asupersync::stream::Stream<Item = Result<Vec<u8>, RequestBodyStreamError>>
518            + Send
519            + Sync
520            + 'static,
521    {
522        Self::Stream(RequestBodyStream::with_expected_size(stream, size))
523    }
524}
525
526/// HTTP request.
527#[derive(Debug)]
528pub struct Request {
529    method: Method,
530    path: String,
531    query: Option<String>,
532    version: HttpVersion,
533    headers: Headers,
534    body: Body,
535    // Extensions for middleware/extractors
536    #[allow(dead_code)] // Used in future implementation
537    extensions: HashMap<std::any::TypeId, Box<dyn std::any::Any + Send + Sync>>,
538}
539
540impl Request {
541    /// Create a new request.
542    #[must_use]
543    pub fn new(method: Method, path: impl Into<String>) -> Self {
544        Self {
545            method,
546            path: path.into(),
547            query: None,
548            version: HttpVersion::default(),
549            headers: Headers::new(),
550            body: Body::Empty,
551            extensions: HashMap::new(),
552        }
553    }
554
555    /// Create a new request with a specific HTTP version.
556    #[must_use]
557    pub fn with_version(method: Method, path: impl Into<String>, version: HttpVersion) -> Self {
558        Self {
559            method,
560            path: path.into(),
561            query: None,
562            version,
563            headers: Headers::new(),
564            body: Body::Empty,
565            extensions: HashMap::new(),
566        }
567    }
568
569    /// Get the HTTP version.
570    #[must_use]
571    pub fn version(&self) -> HttpVersion {
572        self.version
573    }
574
575    /// Set the HTTP version.
576    pub fn set_version(&mut self, version: HttpVersion) {
577        self.version = version;
578    }
579
580    /// Get the HTTP method.
581    #[must_use]
582    pub fn method(&self) -> Method {
583        self.method
584    }
585
586    /// Get the request path.
587    #[must_use]
588    pub fn path(&self) -> &str {
589        &self.path
590    }
591
592    /// Set the request path.
593    ///
594    /// This is used internally for mounted sub-applications, where the
595    /// mount prefix is stripped from the path before forwarding.
596    pub fn set_path(&mut self, path: String) {
597        self.path = path;
598    }
599
600    /// Get the query string.
601    #[must_use]
602    pub fn query(&self) -> Option<&str> {
603        self.query.as_deref()
604    }
605
606    /// Get the headers.
607    #[must_use]
608    pub fn headers(&self) -> &Headers {
609        &self.headers
610    }
611
612    /// Get mutable headers.
613    pub fn headers_mut(&mut self) -> &mut Headers {
614        &mut self.headers
615    }
616
617    /// Get the body.
618    #[must_use]
619    pub fn body(&self) -> &Body {
620        &self.body
621    }
622
623    /// Take the body, replacing with Empty.
624    pub fn take_body(&mut self) -> Body {
625        std::mem::replace(&mut self.body, Body::Empty)
626    }
627
628    /// Set the body.
629    pub fn set_body(&mut self, body: Body) {
630        self.body = body;
631    }
632
633    /// Set the query string.
634    pub fn set_query(&mut self, query: Option<String>) {
635        self.query = query;
636    }
637
638    /// Insert a typed extension value.
639    pub fn insert_extension<T: Any + Send + Sync>(&mut self, value: T) {
640        self.extensions.insert(TypeId::of::<T>(), Box::new(value));
641    }
642
643    /// Get a typed extension value.
644    #[must_use]
645    pub fn get_extension<T: Any + Send + Sync>(&self) -> Option<&T> {
646        self.extensions
647            .get(&TypeId::of::<T>())
648            .and_then(|boxed| boxed.downcast_ref::<T>())
649    }
650
651    /// Get a mutable typed extension value.
652    pub fn get_extension_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
653        self.extensions
654            .get_mut(&TypeId::of::<T>())
655            .and_then(|boxed| boxed.downcast_mut::<T>())
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662    use asupersync::stream::Stream;
663    use std::pin::Pin;
664    use std::sync::Arc;
665    use std::task::{Context, Poll, Wake, Waker};
666
667    struct NoopWaker;
668
669    impl Wake for NoopWaker {
670        fn wake(self: Arc<Self>) {}
671    }
672
673    fn noop_waker() -> Waker {
674        Waker::from(Arc::new(NoopWaker))
675    }
676
677    // ============================================================
678    // bd-isux: RequestBodyStream tests for large uploads
679    // ============================================================
680
681    #[test]
682    fn stream_10mb_body_in_64kb_chunks() {
683        // Test streaming a 10MB request body in chunks (bd-isux)
684        const TARGET_SIZE: usize = 10 * 1024 * 1024; // 10MB
685        const CHUNK_SIZE: usize = 64 * 1024; // 64KB chunks
686
687        // Create chunks that total 10MB
688        let num_chunks = TARGET_SIZE.div_ceil(CHUNK_SIZE);
689        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = (0..num_chunks)
690            .map(|i| {
691                let start = i * CHUNK_SIZE;
692                let end = std::cmp::min(start + CHUNK_SIZE, TARGET_SIZE);
693                let chunk: Vec<u8> = (start..end).map(|j| (j % 256) as u8).collect();
694                Ok(chunk)
695            })
696            .collect();
697
698        let stream = asupersync::stream::iter(chunks);
699        let mut body_stream = RequestBodyStream::with_expected_size(stream, TARGET_SIZE);
700
701        let waker = noop_waker();
702        let mut ctx = Context::from_waker(&waker);
703
704        let mut total_received = 0usize;
705        let mut chunk_count = 0usize;
706
707        loop {
708            match Pin::new(&mut body_stream).poll_next(&mut ctx) {
709                Poll::Ready(Some(Ok(chunk))) => {
710                    total_received += chunk.len();
711                    chunk_count += 1;
712                }
713                Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
714                Poll::Ready(None) => break,
715                Poll::Pending => panic!("Mock stream should never return Pending"),
716            }
717        }
718
719        assert_eq!(total_received, TARGET_SIZE, "Should receive all 10MB");
720        assert_eq!(
721            chunk_count, num_chunks,
722            "Should have correct number of chunks"
723        );
724        assert!(
725            body_stream.is_complete(),
726            "Stream should be marked complete"
727        );
728        assert_eq!(
729            body_stream.bytes_received(),
730            TARGET_SIZE,
731            "bytes_received should match"
732        );
733    }
734
735    #[test]
736    fn stream_memory_bounded_during_processing() {
737        // Test that streaming doesn't buffer entire body (bd-isux)
738        // Verify incremental processing with memory < 1MB at any point
739        const TARGET_SIZE: usize = 5 * 1024 * 1024; // 5MB total
740        const CHUNK_SIZE: usize = 64 * 1024; // 64KB chunks
741        const MAX_MEMORY: usize = 1024 * 1024; // 1MB max
742
743        let num_chunks = TARGET_SIZE / CHUNK_SIZE;
744        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
745            (0..num_chunks).map(|_| Ok(vec![0u8; CHUNK_SIZE])).collect();
746
747        let stream = asupersync::stream::iter(chunks);
748        let mut body_stream = RequestBodyStream::new(stream);
749
750        let waker = noop_waker();
751        let mut ctx = Context::from_waker(&waker);
752
753        // Process chunks incrementally, simulating bounded memory usage
754        let mut processed_total = 0usize;
755        let mut max_held = 0usize;
756
757        loop {
758            match Pin::new(&mut body_stream).poll_next(&mut ctx) {
759                Poll::Ready(Some(Ok(chunk))) => {
760                    // Simulate processing each chunk without accumulating
761                    let chunk_size = chunk.len();
762                    max_held = std::cmp::max(max_held, chunk_size);
763                    processed_total += chunk_size;
764                    // chunk goes out of scope here, releasing memory
765                }
766                Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
767                Poll::Ready(None) => break,
768                Poll::Pending => panic!("Mock stream should never return Pending"),
769            }
770        }
771
772        assert_eq!(processed_total, TARGET_SIZE, "Should process all data");
773        assert!(
774            max_held <= MAX_MEMORY,
775            "Max memory held per chunk ({max_held}) should be < {MAX_MEMORY}"
776        );
777    }
778
779    #[test]
780    fn stream_error_connection_closed() {
781        // Test RequestBodyStreamError::ConnectionClosed (bd-isux)
782        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![
783            Ok(vec![1, 2, 3]),
784            Err(RequestBodyStreamError::ConnectionClosed),
785            Ok(vec![4, 5, 6]), // Should not reach this
786        ];
787
788        let stream = asupersync::stream::iter(chunks);
789        let mut body_stream = RequestBodyStream::new(stream);
790
791        let waker = noop_waker();
792        let mut ctx = Context::from_waker(&waker);
793
794        // First chunk succeeds
795        match Pin::new(&mut body_stream).poll_next(&mut ctx) {
796            Poll::Ready(Some(Ok(chunk))) => assert_eq!(chunk, vec![1, 2, 3]),
797            other => panic!("Expected first chunk, got {other:?}"),
798        }
799
800        // Second chunk is error
801        match Pin::new(&mut body_stream).poll_next(&mut ctx) {
802            Poll::Ready(Some(Err(RequestBodyStreamError::ConnectionClosed))) => {}
803            other => panic!("Expected ConnectionClosed error, got {other:?}"),
804        }
805    }
806
807    #[test]
808    fn stream_error_timeout() {
809        // Test RequestBodyStreamError::Timeout (bd-isux)
810        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
811            vec![Ok(vec![1, 2]), Err(RequestBodyStreamError::Timeout)];
812
813        let stream = asupersync::stream::iter(chunks);
814        let mut body_stream = RequestBodyStream::new(stream);
815
816        let waker = noop_waker();
817        let mut ctx = Context::from_waker(&waker);
818
819        // First chunk succeeds
820        match Pin::new(&mut body_stream).poll_next(&mut ctx) {
821            Poll::Ready(Some(Ok(_))) => {}
822            other => panic!("Expected first chunk, got {other:?}"),
823        }
824
825        // Second is timeout
826        match Pin::new(&mut body_stream).poll_next(&mut ctx) {
827            Poll::Ready(Some(Err(RequestBodyStreamError::Timeout))) => {}
828            other => panic!("Expected Timeout error, got {other:?}"),
829        }
830
831        // Verify error display
832        let err = RequestBodyStreamError::Timeout;
833        assert_eq!(format!("{err}"), "timeout waiting for body data");
834    }
835
836    #[test]
837    fn stream_error_too_large() {
838        // Test RequestBodyStreamError::TooLarge (bd-isux)
839        let err = RequestBodyStreamError::TooLarge {
840            received: 10_000_000,
841            max: 1_000_000,
842        };
843
844        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Err(err)];
845
846        let stream = asupersync::stream::iter(chunks);
847        let mut body_stream = RequestBodyStream::new(stream);
848
849        let waker = noop_waker();
850        let mut ctx = Context::from_waker(&waker);
851
852        match Pin::new(&mut body_stream).poll_next(&mut ctx) {
853            Poll::Ready(Some(Err(RequestBodyStreamError::TooLarge { received, max }))) => {
854                assert_eq!(received, 10_000_000);
855                assert_eq!(max, 1_000_000);
856            }
857            other => panic!("Expected TooLarge error, got {other:?}"),
858        }
859
860        // Verify error display
861        let err = RequestBodyStreamError::TooLarge {
862            received: 10_000_000,
863            max: 1_000_000,
864        };
865        assert!(format!("{err}").contains("10000000"));
866        assert!(format!("{err}").contains("1000000"));
867    }
868
869    #[test]
870    fn stream_error_io() {
871        // Test RequestBodyStreamError::Io (bd-isux)
872        let err = RequestBodyStreamError::Io("disk full".to_string());
873
874        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Err(err)];
875
876        let stream = asupersync::stream::iter(chunks);
877        let mut body_stream = RequestBodyStream::new(stream);
878
879        let waker = noop_waker();
880        let mut ctx = Context::from_waker(&waker);
881
882        match Pin::new(&mut body_stream).poll_next(&mut ctx) {
883            Poll::Ready(Some(Err(RequestBodyStreamError::Io(msg)))) => {
884                assert_eq!(msg, "disk full");
885            }
886            other => panic!("Expected Io error, got {other:?}"),
887        }
888
889        // Verify error display
890        let err = RequestBodyStreamError::Io("disk full".to_string());
891        assert!(format!("{err}").contains("disk full"));
892    }
893
894    #[test]
895    fn stream_expected_size_tracking() {
896        // Test expected_size is correctly tracked (bd-isux)
897        const EXPECTED: usize = 1024;
898
899        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
900            vec![Ok(vec![0u8; 512]), Ok(vec![0u8; 512])];
901
902        let stream = asupersync::stream::iter(chunks);
903        let body_stream = RequestBodyStream::with_expected_size(stream, EXPECTED);
904
905        assert_eq!(body_stream.expected_size(), Some(EXPECTED));
906        assert_eq!(body_stream.bytes_received(), 0);
907        assert!(!body_stream.is_complete());
908    }
909
910    #[test]
911    fn stream_collect_accumulates_all_chunks() {
912        // Test collect() method gathers all data (bd-isux)
913        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
914            vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5]), Ok(vec![6, 7, 8, 9])];
915
916        let stream = asupersync::stream::iter(chunks);
917        let body_stream = RequestBodyStream::new(stream);
918
919        let result = futures_executor::block_on(body_stream.collect());
920        assert!(result.is_ok());
921        let data = result.unwrap();
922        assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
923    }
924
925    #[test]
926    fn stream_collect_propagates_error() {
927        // Test collect() stops and returns error (bd-isux)
928        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![
929            Ok(vec![1, 2, 3]),
930            Err(RequestBodyStreamError::ConnectionClosed),
931            Ok(vec![4, 5, 6]),
932        ];
933
934        let stream = asupersync::stream::iter(chunks);
935        let body_stream = RequestBodyStream::new(stream);
936
937        let result = futures_executor::block_on(body_stream.collect());
938        assert!(result.is_err());
939        match result {
940            Err(RequestBodyStreamError::ConnectionClosed) => {}
941            other => panic!("Expected ConnectionClosed, got {other:?}"),
942        }
943    }
944
945    #[test]
946    fn body_streaming_helper_creates_stream() {
947        // Test Body::streaming() helper (bd-isux)
948        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
949        let stream = asupersync::stream::iter(chunks);
950        let body = Body::streaming(stream);
951
952        assert!(body.is_streaming());
953        assert!(!body.is_empty());
954    }
955
956    #[test]
957    fn body_streaming_with_size_helper() {
958        // Test Body::streaming_with_size() helper (bd-isux)
959        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
960        let stream = asupersync::stream::iter(chunks);
961        let body = Body::streaming_with_size(stream, 3);
962
963        assert!(body.is_streaming());
964
965        if let Body::Stream(s) = body {
966            assert_eq!(s.expected_size(), Some(3));
967        } else {
968            panic!("Expected Body::Stream");
969        }
970    }
971
972    #[test]
973    fn body_into_bytes_async_handles_stream() {
974        // Test Body::into_bytes_async() for streaming bodies (bd-isux)
975        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> =
976            vec![Ok(vec![1, 2]), Ok(vec![3, 4])];
977        let stream = asupersync::stream::iter(chunks);
978        let body = Body::streaming(stream);
979
980        let result = futures_executor::block_on(body.into_bytes_async());
981        assert!(result.is_ok());
982        assert_eq!(result.unwrap(), vec![1, 2, 3, 4]);
983    }
984
985    #[test]
986    fn body_take_stream_extracts_stream() {
987        // Test Body::take_stream() (bd-isux)
988        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
989        let stream = asupersync::stream::iter(chunks);
990        let body = Body::streaming(stream);
991
992        let taken = body.take_stream();
993        assert!(taken.is_some());
994
995        // Non-streaming bodies return None
996        let empty_body = Body::Empty;
997        assert!(empty_body.take_stream().is_none());
998
999        let bytes_body = Body::Bytes(vec![1, 2, 3]);
1000        assert!(bytes_body.take_stream().is_none());
1001    }
1002
1003    #[test]
1004    #[should_panic(expected = "cannot synchronously convert streaming body")]
1005    fn body_into_bytes_panics_for_stream() {
1006        // Test that into_bytes() panics for streaming bodies (bd-isux)
1007        let chunks: Vec<Result<Vec<u8>, RequestBodyStreamError>> = vec![Ok(vec![1, 2, 3])];
1008        let stream = asupersync::stream::iter(chunks);
1009        let body = Body::streaming(stream);
1010
1011        let _ = body.into_bytes(); // Should panic
1012    }
1013
1014    // ============================================================
1015    // bd-3slp: Header lookup optimization tests
1016    // ============================================================
1017
1018    #[test]
1019    fn headers_lowercase_key_fast_path() {
1020        // Test that lowercase keys work without allocation (bd-3slp)
1021        let mut headers = Headers::new();
1022        headers.insert("content-type", b"application/json".to_vec());
1023
1024        // Lookup with lowercase - fast path (no allocation)
1025        assert!(headers.get("content-type").is_some());
1026        assert!(headers.contains("content-type"));
1027
1028        // Lookup with mixed case - still works but may allocate
1029        assert!(headers.get("Content-Type").is_some());
1030        assert!(headers.contains("CONTENT-TYPE"));
1031    }
1032
1033    #[test]
1034    fn headers_case_insensitive_lookup() {
1035        // Verify case-insensitive behavior is preserved (bd-3slp)
1036        let mut headers = Headers::new();
1037        headers.insert("X-Custom-Header", b"value".to_vec());
1038
1039        // All case variations should work
1040        assert_eq!(headers.get("x-custom-header"), Some(b"value".as_slice()));
1041        assert_eq!(headers.get("X-CUSTOM-HEADER"), Some(b"value".as_slice()));
1042        assert_eq!(headers.get("X-Custom-Header"), Some(b"value".as_slice()));
1043        assert_eq!(headers.get("x-CuStOm-HeAdEr"), Some(b"value".as_slice()));
1044    }
1045
1046    #[test]
1047    fn headers_remove_case_insensitive() {
1048        // Verify remove works with case insensitivity (bd-3slp)
1049        let mut headers = Headers::new();
1050        headers.insert("Authorization", b"Bearer token".to_vec());
1051
1052        // Remove with different case
1053        let removed = headers.remove("AUTHORIZATION");
1054        assert_eq!(removed, Some(b"Bearer token".to_vec()));
1055        assert!(!headers.contains("authorization"));
1056    }
1057
1058    #[test]
1059    fn lowercase_header_key_already_lowercase() {
1060        // Fast path test - already lowercase borrows original (bd-3slp)
1061        use std::borrow::Cow;
1062
1063        let result = lowercase_header_key("content-type");
1064        assert!(matches!(result, Cow::Borrowed(_)));
1065        assert_eq!(result.as_ref(), "content-type");
1066    }
1067
1068    #[test]
1069    fn lowercase_header_key_needs_conversion() {
1070        // Slow path test - uppercase chars need conversion (bd-3slp)
1071        use std::borrow::Cow;
1072
1073        let result = lowercase_header_key("Content-Type");
1074        assert!(matches!(result, Cow::Owned(_)));
1075        assert_eq!(result.as_ref(), "content-type");
1076    }
1077
1078    #[test]
1079    fn lowercase_header_key_all_uppercase() {
1080        use std::borrow::Cow;
1081
1082        let result = lowercase_header_key("CONTENT-TYPE");
1083        assert!(matches!(result, Cow::Owned(_)));
1084        assert_eq!(result.as_ref(), "content-type");
1085    }
1086}