Skip to main content

hitbox_http/
body.rs

1//! HTTP body buffering and streaming utilities for transparent caching.
2//!
3//! # Design Rationale: Cache Layer Transparency
4//!
5//! Hitbox aims to be a transparent caching layer - upstream services and clients
6//! should see the same behavior with or without the cache.
7//!
8//! ## Why `BufferedBody::Partial` exists
9//!
10//! When predicates inspect request/response bodies, they consume bytes from the stream.
11//! To maintain transparency, these bytes must be forwarded to upstream. Since you can't
12//! "un-read" from a stream, we must:
13//!
14//! 1. Buffer the consumed prefix
15//! 2. Preserve the unconsumed remaining stream
16//! 3. Replay prefix + remaining to upstream
17//!
18//! This enables:
19//! - Resumable uploads/downloads (e.g., large file transfers)
20//! - Accurate error reporting (errors occur at the same byte position)
21//! - Zero data loss or corruption
22//! - Support for partial transfer protocols (HTTP Range requests, etc.)
23//!
24//! ## Example: Large File Upload
25//!
26//! ```text
27//! Without cache:
28//!   Client → Upstream: 500MB uploaded, error at byte 300MB
29//!
30//! With transparent cache:
31//!   Client → Hitbox (reads 10MB for predicate) → Upstream
32//!   Upstream receives: 10MB (replayed) + 290MB (streamed) + error
33//!   Total: Same 300MB, same error position ✅
34//! ```
35//!
36//! ## Body States
37//!
38//! - **Complete**: Body was fully read and buffered (within configured size limits)
39//! - **Partial**: Body was partially read - contains buffered prefix plus remaining stream or error
40//! - **Passthrough**: Body was not inspected at all (zero overhead)
41//!
42//! The `Partial` state is critical for maintaining transparency when:
43//! - Body size exceeds configured limits but must still be forwarded
44//! - Network or upstream errors occur mid-stream
45//! - Predicates need to inspect body content without blocking large transfers
46
47use bytes::{Buf, Bytes, BytesMut};
48use http_body::{Body as HttpBody, Frame};
49use pin_project::pin_project;
50use std::fmt;
51use std::pin::Pin;
52use std::task::{Context, Poll};
53
54/// What remains of a body stream after partial consumption.
55///
56/// When predicates or extractors read bytes from a body, the stream may have
57/// more data available or may have encountered an error. This enum captures
58/// both possibilities, preserving the stream state for forwarding to upstream.
59///
60/// # When You'll Encounter This
61///
62/// You typically don't create this directly. It appears when:
63/// - Using [`BufferedBody::collect_exact`] which returns remaining stream data
64/// - Decomposing a [`PartialBufferedBody`] via [`into_parts`](PartialBufferedBody::into_parts)
65///
66/// # Invariants
67///
68/// - `Body(stream)`: The stream has not ended and may yield more frames
69/// - `Error(Some(e))`: An error occurred; will be yielded once then become `None`
70/// - `Error(None)`: Error was already yielded; stream is terminated
71///
72/// # Examples
73///
74/// ```no_run
75/// use hitbox_http::{BufferedBody, CollectExactResult, Remaining};
76///
77/// async fn example<B: hyper::body::Body + Unpin>(body: BufferedBody<B>) {
78///     // After collecting 100 bytes from a larger body
79///     let result = body.collect_exact(100).await;
80///     match result {
81///         CollectExactResult::AtLeast { buffered, remaining } => {
82///             match remaining {
83///                 Some(Remaining::Body(stream)) => {
84///                     // More data available in stream
85///                 }
86///                 Some(Remaining::Error(err)) => {
87///                     // Error occurred after collecting bytes
88///                 }
89///                 None => {
90///                     // Stream ended exactly at limit
91///                 }
92///             }
93///         }
94///         CollectExactResult::Incomplete { .. } => {}
95///     }
96/// }
97/// ```
98#[pin_project(project = RemainingProj)]
99#[derive(Debug)]
100pub enum Remaining<B>
101where
102    B: HttpBody,
103{
104    /// The body stream continues with unconsumed data.
105    Body(#[pin] B),
106    /// An error occurred during consumption.
107    ///
108    /// The `Option` allows the error to be yielded once, then `None` on
109    /// subsequent polls.
110    Error(Option<B::Error>),
111}
112
113/// A partially consumed body: buffered prefix plus remaining stream.
114///
115/// Created when a predicate or extractor reads some bytes from a body stream
116/// without consuming it entirely. Implements [`HttpBody`] to transparently
117/// replay the buffered prefix followed by the remaining stream data.
118///
119/// # When You'll Encounter This
120///
121/// You typically don't create this directly. It appears inside
122/// [`BufferedBody::Partial`] after operations like [`collect_exact`](BufferedBody::collect_exact).
123///
124/// # Invariants
125///
126/// - The prefix contains bytes already read from the original stream
127/// - The remaining stream has not been polled since the prefix was extracted
128/// - When polled as `HttpBody`, prefix bytes are yielded before remaining data
129///
130/// # Streaming Behavior
131///
132/// When polled as an [`HttpBody`]:
133/// 1. Yields the buffered prefix (if any) as a single frame
134/// 2. Delegates to the remaining stream, or yields the stored error
135///
136/// # Examples
137///
138/// ```no_run
139/// use bytes::Bytes;
140/// use hitbox_http::{BufferedBody, PartialBufferedBody, Remaining};
141///
142/// fn example<B: hyper::body::Body>(body: BufferedBody<B>) {
143///     // Decompose a partial body
144///     if let BufferedBody::Partial(partial) = body {
145///         let prefix: Option<&Bytes> = partial.prefix();
146///         println!("Buffered {} bytes", prefix.map(|b| b.len()).unwrap_or(0));
147///
148///         let (prefix, remaining) = partial.into_parts();
149///         // Can now handle prefix and remaining separately
150///     }
151/// }
152/// ```
153///
154/// # Performance
155///
156/// The prefix is yielded as a single frame, avoiding per-byte overhead.
157/// The remaining stream is passed through without additional buffering.
158#[pin_project]
159pub struct PartialBufferedBody<B>
160where
161    B: HttpBody,
162{
163    prefix: Option<Bytes>,
164    #[pin]
165    remaining: Remaining<B>,
166}
167
168impl<B> PartialBufferedBody<B>
169where
170    B: HttpBody,
171{
172    /// Constructs a partial body for transparent stream replay.
173    ///
174    /// When this body is polled as [`HttpBody`], the prefix bytes are yielded
175    /// first as a single frame, followed by the remaining stream data (or error).
176    /// This enables predicates to inspect body content without losing data.
177    pub fn new(prefix: Option<Bytes>, remaining: Remaining<B>) -> Self {
178        Self { prefix, remaining }
179    }
180
181    /// Returns the bytes already consumed from the original stream.
182    ///
183    /// These bytes will be replayed before any remaining stream data when
184    /// this body is polled. Returns `None` if no bytes were buffered.
185    pub fn prefix(&self) -> Option<&Bytes> {
186        self.prefix.as_ref()
187    }
188
189    /// Separates the buffered prefix from the remaining stream for independent handling.
190    ///
191    /// Use this when you need to process the prefix and remaining data differently,
192    /// such as forwarding them to separate destinations.
193    pub fn into_parts(self) -> (Option<Bytes>, Remaining<B>) {
194        (self.prefix, self.remaining)
195    }
196}
197
198impl<B: HttpBody> HttpBody for PartialBufferedBody<B> {
199    type Data = Bytes;
200    type Error = B::Error;
201
202    fn poll_frame(
203        self: Pin<&mut Self>,
204        cx: &mut Context<'_>,
205    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
206        let this = self.project();
207
208        // First, yield the prefix if present
209        if let Some(prefix) = this.prefix.take() {
210            return Poll::Ready(Some(Ok(Frame::data(prefix))));
211        }
212
213        // Then handle the remaining body or error
214        match this.remaining.project() {
215            RemainingProj::Body(body) => match body.poll_frame(cx) {
216                Poll::Ready(Some(Ok(frame))) => {
217                    let frame = frame.map_data(|mut data| data.copy_to_bytes(data.remaining()));
218                    Poll::Ready(Some(Ok(frame)))
219                }
220                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
221                Poll::Ready(None) => Poll::Ready(None),
222                Poll::Pending => Poll::Pending,
223            },
224            RemainingProj::Error(error) => {
225                if let Some(err) = error.take() {
226                    Poll::Ready(Some(Err(err)))
227                } else {
228                    Poll::Ready(None)
229                }
230            }
231        }
232    }
233
234    fn size_hint(&self) -> http_body::SizeHint {
235        let prefix_len = self.prefix.as_ref().map(|b| b.len() as u64).unwrap_or(0);
236
237        match &self.remaining {
238            Remaining::Body(body) => {
239                let hint = body.size_hint();
240                let lower = hint.lower().saturating_add(prefix_len);
241
242                // The upper bound needs careful handling:
243                // If we have a prefix, it means we already consumed those bytes from the stream.
244                // The body's upper hint might not have been updated (e.g., if based on Content-Length).
245                // So we need to ensure: lower <= upper
246                let upper = hint.upper().map(|u| {
247                    // Upper should be at least lower to maintain the invariant
248                    u.saturating_add(prefix_len).max(lower)
249                });
250
251                let mut result = http_body::SizeHint::new();
252                result.set_lower(lower);
253                if let Some(u) = upper {
254                    result.set_upper(u);
255                }
256                result
257            }
258            Remaining::Error(_) => http_body::SizeHint::with_exact(prefix_len),
259        }
260    }
261
262    fn is_end_stream(&self) -> bool {
263        if self.prefix.is_some() {
264            return false;
265        }
266
267        match &self.remaining {
268            Remaining::Body(body) => body.is_end_stream(),
269            Remaining::Error(err) => err.is_none(),
270        }
271    }
272}
273
274/// A body wrapper that represents different consumption states.
275///
276/// This enum allows predicates to partially consume request or response bodies
277/// without losing data. The complete body (including any buffered prefix) is
278/// forwarded to upstream services.
279///
280/// # States
281///
282/// - [`Complete`](Self::Complete): Body fully buffered in memory
283/// - [`Partial`](Self::Partial): Prefix buffered, remaining stream preserved
284/// - [`Passthrough`](Self::Passthrough): Untouched, zero overhead
285///
286/// # Examples
287///
288/// Creating a passthrough body for a new request:
289///
290/// ```
291/// use bytes::Bytes;
292/// use http_body_util::Empty;
293/// use hitbox_http::BufferedBody;
294///
295/// let body: BufferedBody<Empty<Bytes>> = BufferedBody::Passthrough(Empty::new());
296/// ```
297///
298/// Creating a complete body from cached data:
299///
300/// ```
301/// use bytes::Bytes;
302/// use http_body_util::Empty;
303/// use hitbox_http::BufferedBody;
304///
305/// let cached_data = Bytes::from_static(b"{\"id\": 42}");
306/// let body: BufferedBody<Empty<Bytes>> = BufferedBody::Complete(Some(cached_data));
307/// ```
308///
309/// # State Transitions
310///
311/// ```text
312/// Passthrough ──collect_exact()──► Partial (if stream continues)
313///      │                               │
314///      │                               ▼
315///      └──────collect()──────────► Complete
316/// ```
317#[pin_project(project = BufferedBodyProj)]
318pub enum BufferedBody<B>
319where
320    B: HttpBody,
321{
322    /// Body was fully read and buffered (within size limits).
323    ///
324    /// The `Option` is used to yield the data once, then return `None` on subsequent polls.
325    Complete(Option<Bytes>),
326
327    /// Body was partially read - contains buffered prefix and remaining stream.
328    ///
329    /// The `PartialBufferedBody` handles streaming of both the prefix and remaining data.
330    Partial(#[pin] PartialBufferedBody<B>),
331
332    /// Body was passed through without reading (untouched).
333    ///
334    /// The body is forwarded directly to upstream without any buffering.
335    Passthrough(#[pin] B),
336}
337
338impl<B> HttpBody for BufferedBody<B>
339where
340    B: HttpBody,
341{
342    type Data = Bytes;
343    type Error = B::Error;
344
345    fn poll_frame(
346        self: Pin<&mut Self>,
347        cx: &mut Context<'_>,
348    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
349        match self.project() {
350            BufferedBodyProj::Complete(data) => {
351                if let Some(bytes) = data.take() {
352                    Poll::Ready(Some(Ok(Frame::data(bytes))))
353                } else {
354                    Poll::Ready(None)
355                }
356            }
357
358            BufferedBodyProj::Partial(partial) => {
359                // Delegate to PartialBody's HttpBody implementation
360                partial.poll_frame(cx)
361            }
362
363            BufferedBodyProj::Passthrough(body) => {
364                // Delegate to the inner body and convert Data type
365                match body.poll_frame(cx) {
366                    Poll::Ready(Some(Ok(frame))) => {
367                        let frame = frame.map_data(|mut data| data.copy_to_bytes(data.remaining()));
368                        Poll::Ready(Some(Ok(frame)))
369                    }
370                    Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
371                    Poll::Ready(None) => Poll::Ready(None),
372                    Poll::Pending => Poll::Pending,
373                }
374            }
375        }
376    }
377
378    fn size_hint(&self) -> http_body::SizeHint {
379        match self {
380            BufferedBody::Complete(Some(bytes)) => {
381                let len = bytes.len() as u64;
382                http_body::SizeHint::with_exact(len)
383            }
384            BufferedBody::Complete(None) => http_body::SizeHint::with_exact(0),
385
386            BufferedBody::Partial(partial) => partial.size_hint(),
387
388            BufferedBody::Passthrough(body) => body.size_hint(),
389        }
390    }
391
392    fn is_end_stream(&self) -> bool {
393        match self {
394            BufferedBody::Complete(None) => true,
395            BufferedBody::Complete(Some(_)) => false,
396
397            BufferedBody::Partial(partial) => partial.is_end_stream(),
398
399            BufferedBody::Passthrough(body) => body.is_end_stream(),
400        }
401    }
402}
403
404/// Result of attempting to collect at least N bytes from a body.
405///
406/// Returned by [`BufferedBody::collect_exact`] to indicate whether the
407/// requested number of bytes was successfully read from the stream.
408///
409/// # When to Use
410///
411/// Use this when you need to inspect a fixed-size prefix of a body without
412/// consuming the entire stream:
413/// - Checking magic bytes for file type detection
414/// - Reading protocol headers
415/// - Validating body format before full processing
416///
417/// # Invariants
418///
419/// - `AtLeast`: `buffered.len() >= requested_bytes`
420/// - `Incomplete`: `buffered.len() < requested_bytes` (stream ended or error)
421/// - The buffered data may exceed the requested size if a frame boundary
422///   didn't align exactly
423///
424/// # Examples
425///
426/// ```no_run
427/// use hitbox_http::{BufferedBody, CollectExactResult};
428///
429/// async fn example<B: hyper::body::Body + Unpin>(body: BufferedBody<B>) {
430///     // Check if body starts with JSON array
431///     let result = body.collect_exact(1).await;
432///     match result {
433///         CollectExactResult::AtLeast { ref buffered, .. } => {
434///             if buffered.starts_with(b"[") {
435///                 // It's a JSON array, reconstruct body for further processing
436///                 let body = result.into_buffered_body();
437///             }
438///         }
439///         CollectExactResult::Incomplete { buffered, error } => {
440///             // Body was empty or error occurred
441///         }
442///     }
443/// }
444/// ```
445#[derive(Debug)]
446pub enum CollectExactResult<B: HttpBody> {
447    /// Successfully collected at least the requested number of bytes.
448    ///
449    /// The buffered bytes contains at least the requested amount (possibly more
450    /// if a frame was consumed). The remaining field contains either:
451    /// - `Some(Remaining::Body(stream))` - more data to stream
452    /// - `Some(Remaining::Error(err))` - error occurred after collecting enough bytes
453    /// - `None` - stream ended cleanly
454    AtLeast {
455        /// The bytes successfully read from the stream (at least `limit_bytes`).
456        buffered: Bytes,
457        /// The remaining stream data, if any.
458        remaining: Option<Remaining<B>>,
459    },
460
461    /// Failed to collect the requested bytes.
462    ///
463    /// This occurs when either:
464    /// - The body stream ended before reaching the requested number of bytes (error is None)
465    /// - An error occurred while reading the stream (error is Some)
466    ///
467    /// The buffered field contains any bytes successfully read before the failure.
468    Incomplete {
469        /// Bytes read before the stream ended or error occurred.
470        buffered: Option<Bytes>,
471        /// The error that occurred, if any.
472        error: Option<B::Error>,
473    },
474}
475
476impl<B: HttpBody> CollectExactResult<B> {
477    /// Converts the result into a [`BufferedBody`], using the buffered data as prefix.
478    ///
479    /// This reconstructs the body:
480    /// - `AtLeast { buffered, remaining }` → `BufferedBody::Partial` with buffered as prefix and remaining, or `BufferedBody::Complete` if no remaining
481    /// - `Incomplete { buffered, error }` → `BufferedBody::Partial` with error, or `BufferedBody::Complete` if no error
482    pub fn into_buffered_body(self) -> BufferedBody<B> {
483        match self {
484            CollectExactResult::AtLeast {
485                buffered,
486                remaining,
487            } => match remaining {
488                Some(rem) => BufferedBody::Partial(PartialBufferedBody::new(Some(buffered), rem)),
489                None => BufferedBody::Complete(Some(buffered)),
490            },
491            CollectExactResult::Incomplete { buffered, error } => match error {
492                Some(err) => BufferedBody::Partial(PartialBufferedBody::new(
493                    buffered,
494                    Remaining::Error(Some(err)),
495                )),
496                None => BufferedBody::Complete(buffered),
497            },
498        }
499    }
500}
501
502/// Helper function to combine an optional prefix with new data.
503///
504/// This is used when buffering partial bodies - we may have already consumed
505/// a prefix from the stream, and now need to combine it with newly read data.
506fn combine_bytes(prefix: Option<Bytes>, data: Bytes) -> Bytes {
507    match prefix {
508        Some(prefix_bytes) if !data.is_empty() => {
509            let mut buf = BytesMut::from(prefix_bytes.as_ref());
510            buf.extend_from_slice(&data);
511            buf.freeze()
512        }
513        Some(prefix_bytes) => prefix_bytes,
514        None => data,
515    }
516}
517
518/// Internal result type for the low-level stream collection function.
519impl<B> BufferedBody<B>
520where
521    B: HttpBody,
522{
523    /// Collects the entire body into memory.
524    ///
525    /// Consumes all remaining bytes from the stream and returns them as a
526    /// contiguous `Bytes` buffer.
527    ///
528    /// # Examples
529    ///
530    /// ```no_run
531    /// use hitbox_http::BufferedBody;
532    ///
533    /// async fn example<B: hyper::body::Body>(body: BufferedBody<B>)
534    /// where
535    ///     B::Data: Send,
536    /// {
537    ///     match body.collect().await {
538    ///         Ok(bytes) => println!("Collected {} bytes", bytes.len()),
539    ///         Err(error_body) => {
540    ///             // Error occurred, but we still have the body for forwarding
541    ///         }
542    ///     }
543    /// }
544    /// ```
545    ///
546    /// # Errors
547    ///
548    /// Returns `Err(BufferedBody::Partial(...))` if the underlying stream
549    /// yields an error. The error is preserved in the returned body so it
550    /// can be forwarded to upstream services.
551    ///
552    /// # Performance
553    ///
554    /// Allocates a buffer to hold the entire body. For large bodies, consider:
555    /// - Using [`collect_exact`](Self::collect_exact) to read only a prefix
556    /// - Streaming the body directly without buffering
557    ///
558    /// # Caveats
559    ///
560    /// This method blocks until the entire body is received. For very large
561    /// bodies or slow streams, this may take significant time and memory.
562    pub async fn collect(self) -> Result<Bytes, Self>
563    where
564        B::Data: Send,
565    {
566        use http_body_util::BodyExt;
567
568        match self {
569            // Already complete, extract bytes
570            BufferedBody::Complete(Some(bytes)) => Ok(bytes),
571            BufferedBody::Complete(None) => Ok(Bytes::new()),
572
573            // Passthrough - need to collect
574            BufferedBody::Passthrough(body) => match body.collect().await {
575                Ok(collected) => Ok(collected.to_bytes()),
576                Err(err) => Err(BufferedBody::Partial(PartialBufferedBody::new(
577                    None,
578                    Remaining::Error(Some(err)),
579                ))),
580            },
581
582            // Partial - delegate to PartialBody which implements HttpBody
583            BufferedBody::Partial(partial) => {
584                let (prefix, remaining) = partial.into_parts();
585                match remaining {
586                    Remaining::Body(body) => match body.collect().await {
587                        Ok(collected) => {
588                            if let Some(prefix_bytes) = prefix {
589                                let mut combined = BytesMut::from(prefix_bytes.as_ref());
590                                combined.extend_from_slice(&collected.to_bytes());
591                                Ok(combined.freeze())
592                            } else {
593                                Ok(collected.to_bytes())
594                            }
595                        }
596                        Err(err) => Err(BufferedBody::Partial(PartialBufferedBody::new(
597                            prefix,
598                            Remaining::Error(Some(err)),
599                        ))),
600                    },
601                    Remaining::Error(err) => Err(BufferedBody::Partial(PartialBufferedBody::new(
602                        prefix,
603                        Remaining::Error(err),
604                    ))),
605                }
606            }
607        }
608    }
609
610    /// Collects at least `limit_bytes` from the body, preserving the rest.
611    ///
612    /// Reads bytes from the stream until at least `limit_bytes` are buffered,
613    /// then returns both the buffered prefix and the remaining stream. This
614    /// enables inspecting a body prefix without consuming the entire stream.
615    ///
616    /// # Examples
617    ///
618    /// ```no_run
619    /// use hitbox_http::{BufferedBody, CollectExactResult};
620    ///
621    /// async fn check_json_array<B: hyper::body::Body + Unpin>(
622    ///     body: BufferedBody<B>,
623    /// ) -> bool {
624    ///     match body.collect_exact(1).await {
625    ///         CollectExactResult::AtLeast { buffered, .. } => {
626    ///             buffered.starts_with(b"[")
627    ///         }
628    ///         CollectExactResult::Incomplete { .. } => false,
629    ///     }
630    /// }
631    /// ```
632    ///
633    /// # Returns
634    ///
635    /// - [`AtLeast`](CollectExactResult::AtLeast): Collected `>= limit_bytes`; remaining stream preserved
636    /// - [`Incomplete`](CollectExactResult::Incomplete): Stream ended or error before reaching limit
637    ///
638    /// # Errors
639    ///
640    /// Stream errors are captured in [`CollectExactResult::Incomplete`] with the
641    /// error in the `error` field. Any bytes read before the error are preserved
642    /// in `buffered`.
643    ///
644    /// # Performance
645    ///
646    /// Only allocates for the prefix buffer (up to `limit_bytes` plus one frame).
647    /// The remaining stream is preserved without additional buffering.
648    ///
649    /// # Use Cases
650    ///
651    /// - Checking magic bytes for file type detection
652    /// - Reading fixed-size protocol headers
653    /// - Validating body format before full processing
654    /// - JQ/regex predicates that need body content
655    pub async fn collect_exact(self, limit_bytes: usize) -> CollectExactResult<B>
656    where
657        B: Unpin,
658    {
659        match self {
660            // Already complete - check if we have enough bytes
661            BufferedBody::Complete(Some(data)) => {
662                if data.len() >= limit_bytes {
663                    // Have at least limit_bytes, stream ended cleanly
664                    CollectExactResult::AtLeast {
665                        buffered: data,
666                        remaining: None,
667                    }
668                } else {
669                    // Not enough bytes
670                    CollectExactResult::Incomplete {
671                        buffered: Some(data),
672                        error: None,
673                    }
674                }
675            }
676            BufferedBody::Complete(None) => {
677                // Empty body
678                CollectExactResult::Incomplete {
679                    buffered: None,
680                    error: None,
681                }
682            }
683
684            // Partial - combine prefix with remaining stream
685            BufferedBody::Partial(partial) => {
686                let (prefix, remaining) = partial.into_parts();
687
688                match prefix {
689                    Some(buffered) if buffered.len() >= limit_bytes => {
690                        // Prefix already has enough bytes - preserve the remaining state
691                        CollectExactResult::AtLeast {
692                            buffered,
693                            remaining: Some(remaining),
694                        }
695                    }
696                    prefix => {
697                        // Need to read more from remaining stream
698                        let prefix_len = prefix.as_ref().map(|p| p.len()).unwrap_or(0);
699                        match remaining {
700                            Remaining::Body(stream) => {
701                                // Read more bytes from stream
702                                let needed = limit_bytes - prefix_len;
703                                let result = collect_exact_from_stream(stream, needed).await;
704                                match result {
705                                    CollectExactResult::AtLeast {
706                                        buffered: new_bytes,
707                                        remaining,
708                                    } => {
709                                        let combined = combine_bytes(prefix, new_bytes);
710                                        CollectExactResult::AtLeast {
711                                            buffered: combined,
712                                            remaining,
713                                        }
714                                    }
715                                    CollectExactResult::Incomplete {
716                                        buffered: new_bytes,
717                                        error,
718                                    } => {
719                                        let combined = if let Some(new) = new_bytes {
720                                            Some(combine_bytes(prefix, new))
721                                        } else {
722                                            prefix
723                                        };
724                                        CollectExactResult::Incomplete {
725                                            buffered: combined,
726                                            error,
727                                        }
728                                    }
729                                }
730                            }
731                            Remaining::Error(error) => {
732                                // Already have an error, can't read more
733                                CollectExactResult::Incomplete {
734                                    buffered: prefix,
735                                    error,
736                                }
737                            }
738                        }
739                    }
740                }
741            }
742
743            // Passthrough - read from stream
744            BufferedBody::Passthrough(stream) => {
745                collect_exact_from_stream(stream, limit_bytes).await
746            }
747        }
748    }
749}
750
751/// Helper function to collect exactly N bytes from a stream.
752async fn collect_exact_from_stream<B>(mut stream: B, limit_bytes: usize) -> CollectExactResult<B>
753where
754    B: HttpBody + Unpin,
755{
756    use http_body_util::BodyExt;
757
758    let mut buffer = BytesMut::new();
759
760    // Read until we have at least limit_bytes
761    while buffer.len() < limit_bytes {
762        match stream.frame().await {
763            Some(Ok(frame)) => {
764                if let Ok(mut data) = frame.into_data() {
765                    buffer.extend_from_slice(&data.copy_to_bytes(data.remaining()));
766                }
767            }
768            Some(Err(error)) => {
769                // Error while reading
770                return CollectExactResult::Incomplete {
771                    buffered: if buffer.is_empty() {
772                        None
773                    } else {
774                        Some(buffer.freeze())
775                    },
776                    error: Some(error),
777                };
778            }
779            None => {
780                // Stream ended before we got limit_bytes
781                return CollectExactResult::Incomplete {
782                    buffered: if buffer.is_empty() {
783                        None
784                    } else {
785                        Some(buffer.freeze())
786                    },
787                    error: None,
788                };
789            }
790        }
791    }
792
793    // We have at least limit_bytes
794    // Return the buffered data and the remaining stream
795    CollectExactResult::AtLeast {
796        buffered: buffer.freeze(),
797        remaining: Some(Remaining::Body(stream)),
798    }
799}
800
801impl<B> fmt::Debug for BufferedBody<B>
802where
803    B: HttpBody,
804{
805    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
806        match self {
807            BufferedBody::Complete(Some(bytes)) => f
808                .debug_tuple("Complete")
809                .field(&format!("{} bytes", bytes.len()))
810                .finish(),
811            BufferedBody::Complete(None) => f.debug_tuple("Complete").field(&"consumed").finish(),
812            BufferedBody::Partial(partial) => {
813                let prefix_len = partial.prefix().map(|b| b.len()).unwrap_or(0);
814                f.debug_struct("Partial")
815                    .field("prefix_len", &prefix_len)
816                    .field("remaining", &"...")
817                    .finish()
818            }
819            BufferedBody::Passthrough(_) => f.debug_tuple("Passthrough").field(&"...").finish(),
820        }
821    }
822}