Skip to main content

http_cache/
body.rs

1//! HTTP body types for streaming cache support.
2//!
3//! This module provides the [`StreamingBody`] type which allows HTTP cache middleware
4//! to handle both cached (buffered) responses and streaming responses from upstream
5//! servers without requiring full buffering of large responses.
6//!
7//! # Variants
8//!
9//! - **Buffered**: Contains cached response data that can be sent immediately
10//! - **Streaming**: Wraps an upstream body for streaming responses
11//! - **File**: Streams from a cacache Reader in 64KB chunks (only with `streaming` feature)
12//!
13//! # Example
14//!
15//! ```rust
16//! use http_cache::StreamingBody;
17//! use bytes::Bytes;
18//! use http_body_util::Full;
19//!
20//! // Cached response - sent immediately from memory
21//! let cached: StreamingBody<Full<Bytes>> = StreamingBody::buffered(Bytes::from("Hello!"));
22//!
23//! // Streaming response - passed through from upstream
24//! let upstream = Full::new(Bytes::from("From upstream"));
25//! let streaming: StreamingBody<Full<Bytes>> = StreamingBody::streaming(upstream);
26//! ```
27
28// Note: pin_project_lite does not support doc comments on enum variant fields,
29// so we allow missing_docs for the generated enum variants and fields.
30// The module-level and enum-level documentation provides full coverage.
31#![allow(missing_docs)]
32
33use std::{
34    fmt,
35    pin::Pin,
36    task::{Context, Poll},
37};
38
39use bytes::Bytes;
40#[cfg(feature = "streaming")]
41use bytes::BytesMut;
42use http_body::{Body, Frame};
43use pin_project_lite::pin_project;
44
45use crate::error::StreamingError;
46
47/// Default buffer size for streaming from disk (64KB).
48///
49/// This size is optimized for modern SSDs and NVMe drives, reducing syscall
50/// overhead while maintaining reasonable memory usage.
51#[cfg(feature = "streaming")]
52const STREAM_BUFFER_SIZE: usize = 64 * 1024;
53
54// When streaming feature is enabled, include the File variant
55#[cfg(feature = "streaming")]
56pin_project! {
57    /// A body type that can represent either buffered data from cache or streaming body from upstream.
58    ///
59    /// This enum allows the HTTP cache middleware to efficiently handle:
60    /// - Cached responses (buffered data)
61    /// - Cache misses (streaming from upstream)
62    /// - Disk-cached responses (streaming from file)
63    ///
64    /// # Variants
65    ///
66    /// - **Buffered**: Contains cached response data that can be sent immediately
67    /// - **Streaming**: Wraps an upstream body for streaming responses
68    /// - **File**: Streams from a cacache Reader in 64KB chunks (only with `streaming` feature)
69    #[project = StreamingBodyProj]
70    pub enum StreamingBody<B> {
71        Buffered {
72            data: Option<Bytes>,
73        },
74        Streaming {
75            #[pin]
76            inner: B,
77        },
78        File {
79            #[pin]
80            reader: cacache::Reader,
81            buffer: BytesMut,
82            done: bool,
83            size: Option<u64>,
84        },
85    }
86}
87
88// When streaming feature is disabled, no File variant
89#[cfg(not(feature = "streaming"))]
90pin_project! {
91    /// A body type that can represent either buffered data from cache or streaming body from upstream.
92    ///
93    /// This enum allows the HTTP cache middleware to efficiently handle:
94    /// - Cached responses (buffered data)
95    /// - Cache misses (streaming from upstream)
96    ///
97    /// # Variants
98    ///
99    /// - **Buffered**: Contains cached response data that can be sent immediately
100    /// - **Streaming**: Wraps an upstream body for streaming responses
101    #[project = StreamingBodyProj]
102    pub enum StreamingBody<B> {
103        Buffered {
104            data: Option<Bytes>,
105        },
106        Streaming {
107            #[pin]
108            inner: B,
109        },
110    }
111}
112
113impl<B> StreamingBody<B> {
114    /// Create a new buffered body from bytes.
115    ///
116    /// The bytes are consumed on the first poll and sent as a single frame.
117    #[must_use]
118    pub fn buffered(data: Bytes) -> Self {
119        Self::Buffered { data: Some(data) }
120    }
121
122    /// Create a new streaming body from an upstream body.
123    ///
124    /// The upstream body is passed through without additional buffering.
125    #[must_use]
126    pub fn streaming(body: B) -> Self {
127        Self::Streaming { inner: body }
128    }
129
130    /// Create a new file-streaming body from a cacache Reader.
131    ///
132    /// This allows streaming large cached responses from disk without
133    /// loading the entire body into memory. Data is read in 64KB chunks.
134    ///
135    /// Use [`from_reader_with_size`](Self::from_reader_with_size) if the
136    /// file size is known for accurate size hints.
137    #[cfg(feature = "streaming")]
138    #[must_use]
139    pub fn from_reader(reader: cacache::Reader) -> Self {
140        Self::File {
141            reader,
142            buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
143            done: false,
144            size: None,
145        }
146    }
147
148    /// Create a new file-streaming body from a cacache Reader with known size.
149    ///
150    /// This allows streaming large cached responses from disk without
151    /// loading the entire body into memory. Data is read in 64KB chunks.
152    ///
153    /// The size is used to provide accurate size hints to downstream consumers.
154    #[cfg(feature = "streaming")]
155    #[must_use]
156    pub fn from_reader_with_size(reader: cacache::Reader, size: u64) -> Self {
157        Self::File {
158            reader,
159            buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
160            done: false,
161            size: Some(size),
162        }
163    }
164}
165
166#[cfg(feature = "streaming")]
167impl<B> Body for StreamingBody<B>
168where
169    B: Body + Unpin,
170    B::Error: Into<StreamingError>,
171    B::Data: Into<Bytes>,
172{
173    type Data = Bytes;
174    type Error = StreamingError;
175
176    fn poll_frame(
177        mut self: Pin<&mut Self>,
178        cx: &mut Context<'_>,
179    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
180        match self.as_mut().project() {
181            StreamingBodyProj::Buffered { data } => {
182                if let Some(bytes) = data.take() {
183                    if bytes.is_empty() {
184                        Poll::Ready(None)
185                    } else {
186                        Poll::Ready(Some(Ok(Frame::data(bytes))))
187                    }
188                } else {
189                    Poll::Ready(None)
190                }
191            }
192            StreamingBodyProj::Streaming { inner } => {
193                inner.poll_frame(cx).map(|opt| {
194                    opt.map(|res| {
195                        res.map(|frame| frame.map_data(Into::into))
196                            .map_err(Into::into)
197                    })
198                })
199            }
200            StreamingBodyProj::File { reader, buffer, done, .. } => {
201                if *done {
202                    return Poll::Ready(None);
203                }
204
205                use tokio::io::AsyncRead;
206
207                // Resize buffer to full capacity for reading (this is safe - fills with zeros)
208                buffer.resize(STREAM_BUFFER_SIZE, 0);
209
210                let mut read_buf = tokio::io::ReadBuf::new(buffer.as_mut());
211
212                match reader.poll_read(cx, &mut read_buf) {
213                    Poll::Ready(Ok(())) => {
214                        let filled_len = read_buf.filled().len();
215                        if filled_len == 0 {
216                            *done = true;
217                            buffer.clear();
218                            Poll::Ready(None)
219                        } else {
220                            // Truncate to actual bytes read and freeze
221                            buffer.truncate(filled_len);
222                            let bytes = buffer.split().freeze();
223                            Poll::Ready(Some(Ok(Frame::data(bytes))))
224                        }
225                    }
226                    Poll::Ready(Err(e)) => {
227                        *done = true;
228                        buffer.clear();
229                        Poll::Ready(Some(Err(StreamingError::new(Box::new(e)))))
230                    }
231                    Poll::Pending => Poll::Pending,
232                }
233            }
234        }
235    }
236
237    fn is_end_stream(&self) -> bool {
238        match self {
239            StreamingBody::Buffered { data } => data.is_none(),
240            StreamingBody::Streaming { inner } => inner.is_end_stream(),
241            StreamingBody::File { done, .. } => *done,
242        }
243    }
244
245    fn size_hint(&self) -> http_body::SizeHint {
246        match self {
247            StreamingBody::Buffered { data } => {
248                if let Some(bytes) = data {
249                    let len = bytes.len() as u64;
250                    http_body::SizeHint::with_exact(len)
251                } else {
252                    http_body::SizeHint::with_exact(0)
253                }
254            }
255            StreamingBody::Streaming { inner } => inner.size_hint(),
256            StreamingBody::File { size, .. } => {
257                // Return exact size if known, otherwise unknown
258                if let Some(s) = size {
259                    http_body::SizeHint::with_exact(*s)
260                } else {
261                    http_body::SizeHint::default()
262                }
263            }
264        }
265    }
266}
267
268#[cfg(not(feature = "streaming"))]
269impl<B> Body for StreamingBody<B>
270where
271    B: Body + Unpin,
272    B::Error: Into<StreamingError>,
273    B::Data: Into<Bytes>,
274{
275    type Data = Bytes;
276    type Error = StreamingError;
277
278    fn poll_frame(
279        mut self: Pin<&mut Self>,
280        cx: &mut Context<'_>,
281    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
282        match self.as_mut().project() {
283            StreamingBodyProj::Buffered { data } => {
284                if let Some(bytes) = data.take() {
285                    if bytes.is_empty() {
286                        Poll::Ready(None)
287                    } else {
288                        Poll::Ready(Some(Ok(Frame::data(bytes))))
289                    }
290                } else {
291                    Poll::Ready(None)
292                }
293            }
294            StreamingBodyProj::Streaming { inner } => {
295                inner.poll_frame(cx).map(|opt| {
296                    opt.map(|res| {
297                        res.map(|frame| frame.map_data(Into::into))
298                            .map_err(Into::into)
299                    })
300                })
301            }
302        }
303    }
304
305    fn is_end_stream(&self) -> bool {
306        match self {
307            StreamingBody::Buffered { data } => data.is_none(),
308            StreamingBody::Streaming { inner } => inner.is_end_stream(),
309        }
310    }
311
312    fn size_hint(&self) -> http_body::SizeHint {
313        match self {
314            StreamingBody::Buffered { data } => {
315                if let Some(bytes) = data {
316                    let len = bytes.len() as u64;
317                    http_body::SizeHint::with_exact(len)
318                } else {
319                    http_body::SizeHint::with_exact(0)
320                }
321            }
322            StreamingBody::Streaming { inner } => inner.size_hint(),
323        }
324    }
325}
326
327impl<B> From<Bytes> for StreamingBody<B> {
328    fn from(bytes: Bytes) -> Self {
329        Self::buffered(bytes)
330    }
331}
332
333#[cfg(feature = "streaming")]
334impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        match self {
337            Self::Buffered { data } => f
338                .debug_struct("StreamingBody::Buffered")
339                .field("has_data", &data.is_some())
340                .field("len", &data.as_ref().map(|b| b.len()))
341                .finish(),
342            Self::Streaming { inner } => f
343                .debug_struct("StreamingBody::Streaming")
344                .field("inner", inner)
345                .finish(),
346            Self::File { done, size, .. } => f
347                .debug_struct("StreamingBody::File")
348                .field("done", done)
349                .field("size", size)
350                .finish_non_exhaustive(),
351        }
352    }
353}
354
355#[cfg(not(feature = "streaming"))]
356impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358        match self {
359            Self::Buffered { data } => f
360                .debug_struct("StreamingBody::Buffered")
361                .field("has_data", &data.is_some())
362                .field("len", &data.as_ref().map(|b| b.len()))
363                .finish(),
364            Self::Streaming { inner } => f
365                .debug_struct("StreamingBody::Streaming")
366                .field("inner", inner)
367                .finish(),
368        }
369    }
370}
371
372#[cfg(feature = "streaming")]
373impl<B> StreamingBody<B>
374where
375    B: Body + Unpin + Send,
376    B::Error: Into<StreamingError>,
377    B::Data: Into<Bytes>,
378{
379    /// Convert this streaming body into a stream of Bytes.
380    ///
381    /// This method allows for streaming without collecting the entire body into memory first.
382    pub fn into_bytes_stream(
383        self,
384    ) -> impl futures_util::Stream<
385        Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
386    > + Send {
387        use futures_util::TryStreamExt;
388
389        http_body_util::BodyStream::new(self)
390            .map_ok(|frame| {
391                // Extract data from frame, StreamingBody always produces Bytes
392                frame.into_data().unwrap_or_else(|_| Bytes::new())
393            })
394            .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
395                Box::new(std::io::Error::other(format!("Stream error: {e}")))
396            })
397    }
398}