hitbox_http/body.rs
1//! HTTP body buffering and streaming utilities for transparent caching.
2//!
3//! # Design Rationale: Cache Layer Transparency
4//!
5//! Hitbox aims to be a transparent caching layer - upstream services and clients
6//! should see the same behavior with or without the cache.
7//!
8//! ## Why `BufferedBody::Partial` exists
9//!
10//! When predicates inspect request/response bodies, they consume bytes from the stream.
11//! To maintain transparency, these bytes must be forwarded to upstream. Since you can't
12//! "un-read" from a stream, we must:
13//!
14//! 1. Buffer the consumed prefix
15//! 2. Preserve the unconsumed remaining stream
16//! 3. Replay prefix + remaining to upstream
17//!
18//! This enables:
19//! - Resumable uploads/downloads (e.g., large file transfers)
20//! - Accurate error reporting (errors occur at the same byte position)
21//! - Zero data loss or corruption
22//! - Support for partial transfer protocols (HTTP Range requests, etc.)
23//!
24//! ## Example: Large File Upload
25//!
26//! ```text
27//! Without cache:
28//! Client → Upstream: 500MB uploaded, error at byte 300MB
29//!
30//! With transparent cache:
31//! Client → Hitbox (reads 10MB for predicate) → Upstream
32//! Upstream receives: 10MB (replayed) + 290MB (streamed) + error
33//! Total: Same 300MB, same error position ✅
34//! ```
35//!
36//! ## Body States
37//!
38//! - **Complete**: Body was fully read and buffered (within configured size limits)
39//! - **Partial**: Body was partially read - contains buffered prefix plus remaining stream or error
40//! - **Passthrough**: Body was not inspected at all (zero overhead)
41//!
42//! The `Partial` state is critical for maintaining transparency when:
43//! - Body size exceeds configured limits but must still be forwarded
44//! - Network or upstream errors occur mid-stream
45//! - Predicates need to inspect body content without blocking large transfers
46
47use bytes::{Buf, Bytes, BytesMut};
48use http_body::{Body as HttpBody, Frame};
49use pin_project::pin_project;
50use std::fmt;
51use std::pin::Pin;
52use std::task::{Context, Poll};
53
54/// What remains of a body stream after partial consumption.
55///
56/// When predicates or extractors read bytes from a body, the stream may have
57/// more data available or may have encountered an error. This enum captures
58/// both possibilities, preserving the stream state for forwarding to upstream.
59///
60/// # When You'll Encounter This
61///
62/// You typically don't create this directly. It appears when:
63/// - Using [`BufferedBody::collect_exact`] which returns remaining stream data
64/// - Decomposing a [`PartialBufferedBody`] via [`into_parts`](PartialBufferedBody::into_parts)
65///
66/// # Invariants
67///
68/// - `Body(stream)`: The stream has not ended and may yield more frames
69/// - `Error(Some(e))`: An error occurred; will be yielded once then become `None`
70/// - `Error(None)`: Error was already yielded; stream is terminated
71///
72/// # Examples
73///
74/// ```no_run
75/// use hitbox_http::{BufferedBody, CollectExactResult, Remaining};
76///
77/// async fn example<B: hyper::body::Body + Unpin>(body: BufferedBody<B>) {
78/// // After collecting 100 bytes from a larger body
79/// let result = body.collect_exact(100).await;
80/// match result {
81/// CollectExactResult::AtLeast { buffered, remaining } => {
82/// match remaining {
83/// Some(Remaining::Body(stream)) => {
84/// // More data available in stream
85/// }
86/// Some(Remaining::Error(err)) => {
87/// // Error occurred after collecting bytes
88/// }
89/// None => {
90/// // Stream ended exactly at limit
91/// }
92/// }
93/// }
94/// CollectExactResult::Incomplete { .. } => {}
95/// }
96/// }
97/// ```
98#[pin_project(project = RemainingProj)]
99#[derive(Debug)]
100pub enum Remaining<B>
101where
102 B: HttpBody,
103{
104 /// The body stream continues with unconsumed data.
105 Body(#[pin] B),
106 /// An error occurred during consumption.
107 ///
108 /// The `Option` allows the error to be yielded once, then `None` on
109 /// subsequent polls.
110 Error(Option<B::Error>),
111}
112
113/// A partially consumed body: buffered prefix plus remaining stream.
114///
115/// Created when a predicate or extractor reads some bytes from a body stream
116/// without consuming it entirely. Implements [`HttpBody`] to transparently
117/// replay the buffered prefix followed by the remaining stream data.
118///
119/// # When You'll Encounter This
120///
121/// You typically don't create this directly. It appears inside
122/// [`BufferedBody::Partial`] after operations like [`collect_exact`](BufferedBody::collect_exact).
123///
124/// # Invariants
125///
126/// - The prefix contains bytes already read from the original stream
127/// - The remaining stream has not been polled since the prefix was extracted
128/// - When polled as `HttpBody`, prefix bytes are yielded before remaining data
129///
130/// # Streaming Behavior
131///
132/// When polled as an [`HttpBody`]:
133/// 1. Yields the buffered prefix (if any) as a single frame
134/// 2. Delegates to the remaining stream, or yields the stored error
135///
136/// # Examples
137///
138/// ```no_run
139/// use bytes::Bytes;
140/// use hitbox_http::{BufferedBody, PartialBufferedBody, Remaining};
141///
142/// fn example<B: hyper::body::Body>(body: BufferedBody<B>) {
143/// // Decompose a partial body
144/// if let BufferedBody::Partial(partial) = body {
145/// let prefix: Option<&Bytes> = partial.prefix();
146/// println!("Buffered {} bytes", prefix.map(|b| b.len()).unwrap_or(0));
147///
148/// let (prefix, remaining) = partial.into_parts();
149/// // Can now handle prefix and remaining separately
150/// }
151/// }
152/// ```
153///
154/// # Performance
155///
156/// The prefix is yielded as a single frame, avoiding per-byte overhead.
157/// The remaining stream is passed through without additional buffering.
158#[pin_project]
159pub struct PartialBufferedBody<B>
160where
161 B: HttpBody,
162{
163 prefix: Option<Bytes>,
164 #[pin]
165 remaining: Remaining<B>,
166}
167
168impl<B> PartialBufferedBody<B>
169where
170 B: HttpBody,
171{
172 /// Constructs a partial body for transparent stream replay.
173 ///
174 /// When this body is polled as [`HttpBody`], the prefix bytes are yielded
175 /// first as a single frame, followed by the remaining stream data (or error).
176 /// This enables predicates to inspect body content without losing data.
177 pub fn new(prefix: Option<Bytes>, remaining: Remaining<B>) -> Self {
178 Self { prefix, remaining }
179 }
180
181 /// Returns the bytes already consumed from the original stream.
182 ///
183 /// These bytes will be replayed before any remaining stream data when
184 /// this body is polled. Returns `None` if no bytes were buffered.
185 pub fn prefix(&self) -> Option<&Bytes> {
186 self.prefix.as_ref()
187 }
188
189 /// Separates the buffered prefix from the remaining stream for independent handling.
190 ///
191 /// Use this when you need to process the prefix and remaining data differently,
192 /// such as forwarding them to separate destinations.
193 pub fn into_parts(self) -> (Option<Bytes>, Remaining<B>) {
194 (self.prefix, self.remaining)
195 }
196}
197
198impl<B: HttpBody> HttpBody for PartialBufferedBody<B> {
199 type Data = Bytes;
200 type Error = B::Error;
201
202 fn poll_frame(
203 self: Pin<&mut Self>,
204 cx: &mut Context<'_>,
205 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
206 let this = self.project();
207
208 // First, yield the prefix if present
209 if let Some(prefix) = this.prefix.take() {
210 return Poll::Ready(Some(Ok(Frame::data(prefix))));
211 }
212
213 // Then handle the remaining body or error
214 match this.remaining.project() {
215 RemainingProj::Body(body) => match body.poll_frame(cx) {
216 Poll::Ready(Some(Ok(frame))) => {
217 let frame = frame.map_data(|mut data| data.copy_to_bytes(data.remaining()));
218 Poll::Ready(Some(Ok(frame)))
219 }
220 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
221 Poll::Ready(None) => Poll::Ready(None),
222 Poll::Pending => Poll::Pending,
223 },
224 RemainingProj::Error(error) => {
225 if let Some(err) = error.take() {
226 Poll::Ready(Some(Err(err)))
227 } else {
228 Poll::Ready(None)
229 }
230 }
231 }
232 }
233
234 fn size_hint(&self) -> http_body::SizeHint {
235 let prefix_len = self.prefix.as_ref().map(|b| b.len() as u64).unwrap_or(0);
236
237 match &self.remaining {
238 Remaining::Body(body) => {
239 let hint = body.size_hint();
240 let lower = hint.lower().saturating_add(prefix_len);
241
242 // The upper bound needs careful handling:
243 // If we have a prefix, it means we already consumed those bytes from the stream.
244 // The body's upper hint might not have been updated (e.g., if based on Content-Length).
245 // So we need to ensure: lower <= upper
246 let upper = hint.upper().map(|u| {
247 // Upper should be at least lower to maintain the invariant
248 u.saturating_add(prefix_len).max(lower)
249 });
250
251 let mut result = http_body::SizeHint::new();
252 result.set_lower(lower);
253 if let Some(u) = upper {
254 result.set_upper(u);
255 }
256 result
257 }
258 Remaining::Error(_) => http_body::SizeHint::with_exact(prefix_len),
259 }
260 }
261
262 fn is_end_stream(&self) -> bool {
263 if self.prefix.is_some() {
264 return false;
265 }
266
267 match &self.remaining {
268 Remaining::Body(body) => body.is_end_stream(),
269 Remaining::Error(err) => err.is_none(),
270 }
271 }
272}
273
274/// A body wrapper that represents different consumption states.
275///
276/// This enum allows predicates to partially consume request or response bodies
277/// without losing data. The complete body (including any buffered prefix) is
278/// forwarded to upstream services.
279///
280/// # States
281///
282/// - [`Complete`](Self::Complete): Body fully buffered in memory
283/// - [`Partial`](Self::Partial): Prefix buffered, remaining stream preserved
284/// - [`Passthrough`](Self::Passthrough): Untouched, zero overhead
285///
286/// # Examples
287///
288/// Creating a passthrough body for a new request:
289///
290/// ```
291/// use bytes::Bytes;
292/// use http_body_util::Empty;
293/// use hitbox_http::BufferedBody;
294///
295/// let body: BufferedBody<Empty<Bytes>> = BufferedBody::Passthrough(Empty::new());
296/// ```
297///
298/// Creating a complete body from cached data:
299///
300/// ```
301/// use bytes::Bytes;
302/// use http_body_util::Empty;
303/// use hitbox_http::BufferedBody;
304///
305/// let cached_data = Bytes::from_static(b"{\"id\": 42}");
306/// let body: BufferedBody<Empty<Bytes>> = BufferedBody::Complete(Some(cached_data));
307/// ```
308///
309/// # State Transitions
310///
311/// ```text
312/// Passthrough ──collect_exact()──► Partial (if stream continues)
313/// │ │
314/// │ ▼
315/// └──────collect()──────────► Complete
316/// ```
317#[pin_project(project = BufferedBodyProj)]
318pub enum BufferedBody<B>
319where
320 B: HttpBody,
321{
322 /// Body was fully read and buffered (within size limits).
323 ///
324 /// The `Option` is used to yield the data once, then return `None` on subsequent polls.
325 Complete(Option<Bytes>),
326
327 /// Body was partially read - contains buffered prefix and remaining stream.
328 ///
329 /// The `PartialBufferedBody` handles streaming of both the prefix and remaining data.
330 Partial(#[pin] PartialBufferedBody<B>),
331
332 /// Body was passed through without reading (untouched).
333 ///
334 /// The body is forwarded directly to upstream without any buffering.
335 Passthrough(#[pin] B),
336}
337
338impl<B> HttpBody for BufferedBody<B>
339where
340 B: HttpBody,
341{
342 type Data = Bytes;
343 type Error = B::Error;
344
345 fn poll_frame(
346 self: Pin<&mut Self>,
347 cx: &mut Context<'_>,
348 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
349 match self.project() {
350 BufferedBodyProj::Complete(data) => {
351 if let Some(bytes) = data.take() {
352 Poll::Ready(Some(Ok(Frame::data(bytes))))
353 } else {
354 Poll::Ready(None)
355 }
356 }
357
358 BufferedBodyProj::Partial(partial) => {
359 // Delegate to PartialBody's HttpBody implementation
360 partial.poll_frame(cx)
361 }
362
363 BufferedBodyProj::Passthrough(body) => {
364 // Delegate to the inner body and convert Data type
365 match body.poll_frame(cx) {
366 Poll::Ready(Some(Ok(frame))) => {
367 let frame = frame.map_data(|mut data| data.copy_to_bytes(data.remaining()));
368 Poll::Ready(Some(Ok(frame)))
369 }
370 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
371 Poll::Ready(None) => Poll::Ready(None),
372 Poll::Pending => Poll::Pending,
373 }
374 }
375 }
376 }
377
378 fn size_hint(&self) -> http_body::SizeHint {
379 match self {
380 BufferedBody::Complete(Some(bytes)) => {
381 let len = bytes.len() as u64;
382 http_body::SizeHint::with_exact(len)
383 }
384 BufferedBody::Complete(None) => http_body::SizeHint::with_exact(0),
385
386 BufferedBody::Partial(partial) => partial.size_hint(),
387
388 BufferedBody::Passthrough(body) => body.size_hint(),
389 }
390 }
391
392 fn is_end_stream(&self) -> bool {
393 match self {
394 BufferedBody::Complete(None) => true,
395 BufferedBody::Complete(Some(_)) => false,
396
397 BufferedBody::Partial(partial) => partial.is_end_stream(),
398
399 BufferedBody::Passthrough(body) => body.is_end_stream(),
400 }
401 }
402}
403
404/// Result of attempting to collect at least N bytes from a body.
405///
406/// Returned by [`BufferedBody::collect_exact`] to indicate whether the
407/// requested number of bytes was successfully read from the stream.
408///
409/// # When to Use
410///
411/// Use this when you need to inspect a fixed-size prefix of a body without
412/// consuming the entire stream:
413/// - Checking magic bytes for file type detection
414/// - Reading protocol headers
415/// - Validating body format before full processing
416///
417/// # Invariants
418///
419/// - `AtLeast`: `buffered.len() >= requested_bytes`
420/// - `Incomplete`: `buffered.len() < requested_bytes` (stream ended or error)
421/// - The buffered data may exceed the requested size if a frame boundary
422/// didn't align exactly
423///
424/// # Examples
425///
426/// ```no_run
427/// use hitbox_http::{BufferedBody, CollectExactResult};
428///
429/// async fn example<B: hyper::body::Body + Unpin>(body: BufferedBody<B>) {
430/// // Check if body starts with JSON array
431/// let result = body.collect_exact(1).await;
432/// match result {
433/// CollectExactResult::AtLeast { ref buffered, .. } => {
434/// if buffered.starts_with(b"[") {
435/// // It's a JSON array, reconstruct body for further processing
436/// let body = result.into_buffered_body();
437/// }
438/// }
439/// CollectExactResult::Incomplete { buffered, error } => {
440/// // Body was empty or error occurred
441/// }
442/// }
443/// }
444/// ```
445#[derive(Debug)]
446pub enum CollectExactResult<B: HttpBody> {
447 /// Successfully collected at least the requested number of bytes.
448 ///
449 /// The buffered bytes contains at least the requested amount (possibly more
450 /// if a frame was consumed). The remaining field contains either:
451 /// - `Some(Remaining::Body(stream))` - more data to stream
452 /// - `Some(Remaining::Error(err))` - error occurred after collecting enough bytes
453 /// - `None` - stream ended cleanly
454 AtLeast {
455 /// The bytes successfully read from the stream (at least `limit_bytes`).
456 buffered: Bytes,
457 /// The remaining stream data, if any.
458 remaining: Option<Remaining<B>>,
459 },
460
461 /// Failed to collect the requested bytes.
462 ///
463 /// This occurs when either:
464 /// - The body stream ended before reaching the requested number of bytes (error is None)
465 /// - An error occurred while reading the stream (error is Some)
466 ///
467 /// The buffered field contains any bytes successfully read before the failure.
468 Incomplete {
469 /// Bytes read before the stream ended or error occurred.
470 buffered: Option<Bytes>,
471 /// The error that occurred, if any.
472 error: Option<B::Error>,
473 },
474}
475
476impl<B: HttpBody> CollectExactResult<B> {
477 /// Converts the result into a [`BufferedBody`], using the buffered data as prefix.
478 ///
479 /// This reconstructs the body:
480 /// - `AtLeast { buffered, remaining }` → `BufferedBody::Partial` with buffered as prefix and remaining, or `BufferedBody::Complete` if no remaining
481 /// - `Incomplete { buffered, error }` → `BufferedBody::Partial` with error, or `BufferedBody::Complete` if no error
482 pub fn into_buffered_body(self) -> BufferedBody<B> {
483 match self {
484 CollectExactResult::AtLeast {
485 buffered,
486 remaining,
487 } => match remaining {
488 Some(rem) => BufferedBody::Partial(PartialBufferedBody::new(Some(buffered), rem)),
489 None => BufferedBody::Complete(Some(buffered)),
490 },
491 CollectExactResult::Incomplete { buffered, error } => match error {
492 Some(err) => BufferedBody::Partial(PartialBufferedBody::new(
493 buffered,
494 Remaining::Error(Some(err)),
495 )),
496 None => BufferedBody::Complete(buffered),
497 },
498 }
499 }
500}
501
502/// Helper function to combine an optional prefix with new data.
503///
504/// This is used when buffering partial bodies - we may have already consumed
505/// a prefix from the stream, and now need to combine it with newly read data.
506fn combine_bytes(prefix: Option<Bytes>, data: Bytes) -> Bytes {
507 match prefix {
508 Some(prefix_bytes) if !data.is_empty() => {
509 let mut buf = BytesMut::from(prefix_bytes.as_ref());
510 buf.extend_from_slice(&data);
511 buf.freeze()
512 }
513 Some(prefix_bytes) => prefix_bytes,
514 None => data,
515 }
516}
517
518/// Internal result type for the low-level stream collection function.
519impl<B> BufferedBody<B>
520where
521 B: HttpBody,
522{
523 /// Collects the entire body into memory.
524 ///
525 /// Consumes all remaining bytes from the stream and returns them as a
526 /// contiguous `Bytes` buffer.
527 ///
528 /// # Examples
529 ///
530 /// ```no_run
531 /// use hitbox_http::BufferedBody;
532 ///
533 /// async fn example<B: hyper::body::Body>(body: BufferedBody<B>)
534 /// where
535 /// B::Data: Send,
536 /// {
537 /// match body.collect().await {
538 /// Ok(bytes) => println!("Collected {} bytes", bytes.len()),
539 /// Err(error_body) => {
540 /// // Error occurred, but we still have the body for forwarding
541 /// }
542 /// }
543 /// }
544 /// ```
545 ///
546 /// # Errors
547 ///
548 /// Returns `Err(BufferedBody::Partial(...))` if the underlying stream
549 /// yields an error. The error is preserved in the returned body so it
550 /// can be forwarded to upstream services.
551 ///
552 /// # Performance
553 ///
554 /// Allocates a buffer to hold the entire body. For large bodies, consider:
555 /// - Using [`collect_exact`](Self::collect_exact) to read only a prefix
556 /// - Streaming the body directly without buffering
557 ///
558 /// # Caveats
559 ///
560 /// This method blocks until the entire body is received. For very large
561 /// bodies or slow streams, this may take significant time and memory.
562 pub async fn collect(self) -> Result<Bytes, Self>
563 where
564 B::Data: Send,
565 {
566 use http_body_util::BodyExt;
567
568 match self {
569 // Already complete, extract bytes
570 BufferedBody::Complete(Some(bytes)) => Ok(bytes),
571 BufferedBody::Complete(None) => Ok(Bytes::new()),
572
573 // Passthrough - need to collect
574 BufferedBody::Passthrough(body) => match body.collect().await {
575 Ok(collected) => Ok(collected.to_bytes()),
576 Err(err) => Err(BufferedBody::Partial(PartialBufferedBody::new(
577 None,
578 Remaining::Error(Some(err)),
579 ))),
580 },
581
582 // Partial - delegate to PartialBody which implements HttpBody
583 BufferedBody::Partial(partial) => {
584 let (prefix, remaining) = partial.into_parts();
585 match remaining {
586 Remaining::Body(body) => match body.collect().await {
587 Ok(collected) => {
588 if let Some(prefix_bytes) = prefix {
589 let mut combined = BytesMut::from(prefix_bytes.as_ref());
590 combined.extend_from_slice(&collected.to_bytes());
591 Ok(combined.freeze())
592 } else {
593 Ok(collected.to_bytes())
594 }
595 }
596 Err(err) => Err(BufferedBody::Partial(PartialBufferedBody::new(
597 prefix,
598 Remaining::Error(Some(err)),
599 ))),
600 },
601 Remaining::Error(err) => Err(BufferedBody::Partial(PartialBufferedBody::new(
602 prefix,
603 Remaining::Error(err),
604 ))),
605 }
606 }
607 }
608 }
609
610 /// Collects at least `limit_bytes` from the body, preserving the rest.
611 ///
612 /// Reads bytes from the stream until at least `limit_bytes` are buffered,
613 /// then returns both the buffered prefix and the remaining stream. This
614 /// enables inspecting a body prefix without consuming the entire stream.
615 ///
616 /// # Examples
617 ///
618 /// ```no_run
619 /// use hitbox_http::{BufferedBody, CollectExactResult};
620 ///
621 /// async fn check_json_array<B: hyper::body::Body + Unpin>(
622 /// body: BufferedBody<B>,
623 /// ) -> bool {
624 /// match body.collect_exact(1).await {
625 /// CollectExactResult::AtLeast { buffered, .. } => {
626 /// buffered.starts_with(b"[")
627 /// }
628 /// CollectExactResult::Incomplete { .. } => false,
629 /// }
630 /// }
631 /// ```
632 ///
633 /// # Returns
634 ///
635 /// - [`AtLeast`](CollectExactResult::AtLeast): Collected `>= limit_bytes`; remaining stream preserved
636 /// - [`Incomplete`](CollectExactResult::Incomplete): Stream ended or error before reaching limit
637 ///
638 /// # Errors
639 ///
640 /// Stream errors are captured in [`CollectExactResult::Incomplete`] with the
641 /// error in the `error` field. Any bytes read before the error are preserved
642 /// in `buffered`.
643 ///
644 /// # Performance
645 ///
646 /// Only allocates for the prefix buffer (up to `limit_bytes` plus one frame).
647 /// The remaining stream is preserved without additional buffering.
648 ///
649 /// # Use Cases
650 ///
651 /// - Checking magic bytes for file type detection
652 /// - Reading fixed-size protocol headers
653 /// - Validating body format before full processing
654 /// - JQ/regex predicates that need body content
655 pub async fn collect_exact(self, limit_bytes: usize) -> CollectExactResult<B>
656 where
657 B: Unpin,
658 {
659 match self {
660 // Already complete - check if we have enough bytes
661 BufferedBody::Complete(Some(data)) => {
662 if data.len() >= limit_bytes {
663 // Have at least limit_bytes, stream ended cleanly
664 CollectExactResult::AtLeast {
665 buffered: data,
666 remaining: None,
667 }
668 } else {
669 // Not enough bytes
670 CollectExactResult::Incomplete {
671 buffered: Some(data),
672 error: None,
673 }
674 }
675 }
676 BufferedBody::Complete(None) => {
677 // Empty body
678 CollectExactResult::Incomplete {
679 buffered: None,
680 error: None,
681 }
682 }
683
684 // Partial - combine prefix with remaining stream
685 BufferedBody::Partial(partial) => {
686 let (prefix, remaining) = partial.into_parts();
687
688 match prefix {
689 Some(buffered) if buffered.len() >= limit_bytes => {
690 // Prefix already has enough bytes - preserve the remaining state
691 CollectExactResult::AtLeast {
692 buffered,
693 remaining: Some(remaining),
694 }
695 }
696 prefix => {
697 // Need to read more from remaining stream
698 let prefix_len = prefix.as_ref().map(|p| p.len()).unwrap_or(0);
699 match remaining {
700 Remaining::Body(stream) => {
701 // Read more bytes from stream
702 let needed = limit_bytes - prefix_len;
703 let result = collect_exact_from_stream(stream, needed).await;
704 match result {
705 CollectExactResult::AtLeast {
706 buffered: new_bytes,
707 remaining,
708 } => {
709 let combined = combine_bytes(prefix, new_bytes);
710 CollectExactResult::AtLeast {
711 buffered: combined,
712 remaining,
713 }
714 }
715 CollectExactResult::Incomplete {
716 buffered: new_bytes,
717 error,
718 } => {
719 let combined = if let Some(new) = new_bytes {
720 Some(combine_bytes(prefix, new))
721 } else {
722 prefix
723 };
724 CollectExactResult::Incomplete {
725 buffered: combined,
726 error,
727 }
728 }
729 }
730 }
731 Remaining::Error(error) => {
732 // Already have an error, can't read more
733 CollectExactResult::Incomplete {
734 buffered: prefix,
735 error,
736 }
737 }
738 }
739 }
740 }
741 }
742
743 // Passthrough - read from stream
744 BufferedBody::Passthrough(stream) => {
745 collect_exact_from_stream(stream, limit_bytes).await
746 }
747 }
748 }
749}
750
751/// Helper function to collect exactly N bytes from a stream.
752async fn collect_exact_from_stream<B>(mut stream: B, limit_bytes: usize) -> CollectExactResult<B>
753where
754 B: HttpBody + Unpin,
755{
756 use http_body_util::BodyExt;
757
758 let mut buffer = BytesMut::new();
759
760 // Read until we have at least limit_bytes
761 while buffer.len() < limit_bytes {
762 match stream.frame().await {
763 Some(Ok(frame)) => {
764 if let Ok(mut data) = frame.into_data() {
765 buffer.extend_from_slice(&data.copy_to_bytes(data.remaining()));
766 }
767 }
768 Some(Err(error)) => {
769 // Error while reading
770 return CollectExactResult::Incomplete {
771 buffered: if buffer.is_empty() {
772 None
773 } else {
774 Some(buffer.freeze())
775 },
776 error: Some(error),
777 };
778 }
779 None => {
780 // Stream ended before we got limit_bytes
781 return CollectExactResult::Incomplete {
782 buffered: if buffer.is_empty() {
783 None
784 } else {
785 Some(buffer.freeze())
786 },
787 error: None,
788 };
789 }
790 }
791 }
792
793 // We have at least limit_bytes
794 // Return the buffered data and the remaining stream
795 CollectExactResult::AtLeast {
796 buffered: buffer.freeze(),
797 remaining: Some(Remaining::Body(stream)),
798 }
799}
800
801impl<B> fmt::Debug for BufferedBody<B>
802where
803 B: HttpBody,
804{
805 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
806 match self {
807 BufferedBody::Complete(Some(bytes)) => f
808 .debug_tuple("Complete")
809 .field(&format!("{} bytes", bytes.len()))
810 .finish(),
811 BufferedBody::Complete(None) => f.debug_tuple("Complete").field(&"consumed").finish(),
812 BufferedBody::Partial(partial) => {
813 let prefix_len = partial.prefix().map(|b| b.len()).unwrap_or(0);
814 f.debug_struct("Partial")
815 .field("prefix_len", &prefix_len)
816 .field("remaining", &"...")
817 .finish()
818 }
819 BufferedBody::Passthrough(_) => f.debug_tuple("Passthrough").field(&"...").finish(),
820 }
821 }
822}