Skip to main content

http_cache/
body.rs

1//! HTTP body types for streaming cache support.
2//!
3//! This module provides the [`StreamingBody`] type which allows HTTP cache middleware
4//! to handle both cached (buffered) responses and streaming responses from upstream
5//! servers without requiring full buffering of large responses.
6//!
7//! # Variants
8//!
9//! - **Buffered**: Contains cached response data that can be sent immediately
10//! - **Streaming**: Wraps an upstream body for streaming responses
11//! - **File**: Streams from a [`tokio::fs::File`] in 64KB chunks (only with `streaming` feature)
12//!
13//! # Example
14//!
15//! ```rust
16//! use http_cache::StreamingBody;
17//! use bytes::Bytes;
18//! use http_body_util::Full;
19//!
20//! // Cached response - sent immediately from memory
21//! let cached: StreamingBody<Full<Bytes>> = StreamingBody::buffered(Bytes::from("Hello!"));
22//!
23//! // Streaming response - passed through from upstream
24//! let upstream = Full::new(Bytes::from("From upstream"));
25//! let streaming: StreamingBody<Full<Bytes>> = StreamingBody::streaming(upstream);
26//! ```
27
28// Note: pin_project_lite does not support doc comments on enum variant fields,
29// so we allow missing_docs for the generated enum variants and fields.
30// The module-level and enum-level documentation provides full coverage.
31#![allow(missing_docs)]
32
33use std::{
34    fmt,
35    pin::Pin,
36    task::{Context, Poll},
37};
38
39use bytes::Bytes;
40#[cfg(feature = "streaming")]
41use bytes::BytesMut;
42use http_body::{Body, Frame};
43use pin_project_lite::pin_project;
44
45use crate::error::StreamingError;
46
47/// Default buffer size for streaming from disk (64KB).
48///
49/// This size is optimized for modern SSDs and NVMe drives, reducing syscall
50/// overhead while maintaining reasonable memory usage.
51#[cfg(feature = "streaming")]
52const STREAM_BUFFER_SIZE: usize = 64 * 1024;
53
54// When streaming feature is enabled, include the File variant
55#[cfg(feature = "streaming")]
56pin_project! {
57    /// A body type that can represent either buffered data from cache or streaming body from upstream.
58    ///
59    /// This enum allows the HTTP cache middleware to efficiently handle:
60    /// - Cached responses (buffered data)
61    /// - Cache misses (streaming from upstream)
62    /// - Disk-cached responses (streaming from file)
63    ///
64    /// # Variants
65    ///
66    /// - **Buffered**: Contains cached response data that can be sent immediately
67    /// - **Streaming**: Wraps an upstream body for streaming responses
68    /// - **File**: Streams from a [`tokio::fs::File`] in 64KB chunks (only with `streaming` feature)
69    #[project = StreamingBodyProj]
70    pub enum StreamingBody<B> {
71        Buffered {
72            data: Option<Bytes>,
73        },
74        Streaming {
75            #[pin]
76            inner: B,
77        },
78        File {
79            #[pin]
80            reader: tokio::fs::File,
81            buffer: BytesMut,
82            done: bool,
83            size: u64,
84        },
85    }
86}
87
88// When streaming feature is disabled, no File variant
89#[cfg(not(feature = "streaming"))]
90pin_project! {
91    /// A body type that can represent either buffered data from cache or streaming body from upstream.
92    ///
93    /// This enum allows the HTTP cache middleware to efficiently handle:
94    /// - Cached responses (buffered data)
95    /// - Cache misses (streaming from upstream)
96    ///
97    /// # Variants
98    ///
99    /// - **Buffered**: Contains cached response data that can be sent immediately
100    /// - **Streaming**: Wraps an upstream body for streaming responses
101    #[project = StreamingBodyProj]
102    pub enum StreamingBody<B> {
103        Buffered {
104            data: Option<Bytes>,
105        },
106        Streaming {
107            #[pin]
108            inner: B,
109        },
110    }
111}
112
113impl<B> StreamingBody<B> {
114    /// Create a new buffered body from bytes.
115    ///
116    /// The bytes are consumed on the first poll and sent as a single frame.
117    #[must_use]
118    pub fn buffered(data: Bytes) -> Self {
119        Self::Buffered { data: Some(data) }
120    }
121
122    /// Create a new streaming body from an upstream body.
123    ///
124    /// The upstream body is passed through without additional buffering.
125    #[must_use]
126    pub fn streaming(body: B) -> Self {
127        Self::Streaming { inner: body }
128    }
129
130    /// Create a new file-streaming body from a [`tokio::fs::File`] with known size.
131    ///
132    /// This allows streaming large cached responses from disk without
133    /// loading the entire body into memory. Data is read in 64KB chunks.
134    ///
135    /// # Cursor contract
136    ///
137    /// The caller must position `file` at the start of the body bytes before
138    /// calling this function. Streaming uses the current cursor position and
139    /// reads until EOF — correctness relies on the caller having already
140    /// advanced past any file header (e.g. the 16-byte nonce header written
141    /// by `StreamingManager`) and on the file length exactly matching
142    /// `16 + size` so EOF lands on the body boundary.
143    ///
144    /// `size` is used to provide accurate size hints to downstream consumers.
145    #[cfg(feature = "streaming")]
146    #[must_use]
147    pub fn from_file_with_size(file: tokio::fs::File, size: u64) -> Self {
148        Self::File {
149            reader: file,
150            buffer: BytesMut::with_capacity(STREAM_BUFFER_SIZE),
151            done: false,
152            size,
153        }
154    }
155}
156
157#[cfg(feature = "streaming")]
158impl<B> Body for StreamingBody<B>
159where
160    B: Body + Unpin,
161    B::Error: Into<StreamingError>,
162    B::Data: Into<Bytes>,
163{
164    type Data = Bytes;
165    type Error = StreamingError;
166
167    fn poll_frame(
168        mut self: Pin<&mut Self>,
169        cx: &mut Context<'_>,
170    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
171        match self.as_mut().project() {
172            StreamingBodyProj::Buffered { data } => {
173                if let Some(bytes) = data.take() {
174                    if bytes.is_empty() {
175                        Poll::Ready(None)
176                    } else {
177                        Poll::Ready(Some(Ok(Frame::data(bytes))))
178                    }
179                } else {
180                    Poll::Ready(None)
181                }
182            }
183            StreamingBodyProj::Streaming { inner } => {
184                inner.poll_frame(cx).map(|opt| {
185                    opt.map(|res| {
186                        res.map(|frame| frame.map_data(Into::into))
187                            .map_err(Into::into)
188                    })
189                })
190            }
191            StreamingBodyProj::File { reader, buffer, done, .. } => {
192                if *done {
193                    return Poll::Ready(None);
194                }
195
196                use tokio::io::AsyncRead;
197
198                // Resize buffer to full capacity for reading (this is safe - fills with zeros)
199                buffer.resize(STREAM_BUFFER_SIZE, 0);
200
201                let mut read_buf = tokio::io::ReadBuf::new(buffer.as_mut());
202
203                match reader.poll_read(cx, &mut read_buf) {
204                    Poll::Ready(Ok(())) => {
205                        let filled_len = read_buf.filled().len();
206                        if filled_len == 0 {
207                            *done = true;
208                            buffer.clear();
209                            Poll::Ready(None)
210                        } else {
211                            // Truncate to actual bytes read and freeze
212                            buffer.truncate(filled_len);
213                            let bytes = buffer.split().freeze();
214                            Poll::Ready(Some(Ok(Frame::data(bytes))))
215                        }
216                    }
217                    Poll::Ready(Err(e)) => {
218                        *done = true;
219                        buffer.clear();
220                        Poll::Ready(Some(Err(StreamingError::new(Box::new(e)))))
221                    }
222                    Poll::Pending => Poll::Pending,
223                }
224            }
225        }
226    }
227
228    fn is_end_stream(&self) -> bool {
229        match self {
230            StreamingBody::Buffered { data } => data.is_none(),
231            StreamingBody::Streaming { inner } => inner.is_end_stream(),
232            StreamingBody::File { done, .. } => *done,
233        }
234    }
235
236    fn size_hint(&self) -> http_body::SizeHint {
237        match self {
238            StreamingBody::Buffered { data } => {
239                if let Some(bytes) = data {
240                    let len = bytes.len() as u64;
241                    http_body::SizeHint::with_exact(len)
242                } else {
243                    http_body::SizeHint::with_exact(0)
244                }
245            }
246            StreamingBody::Streaming { inner } => inner.size_hint(),
247            StreamingBody::File { size, .. } => {
248                http_body::SizeHint::with_exact(*size)
249            }
250        }
251    }
252}
253
254#[cfg(not(feature = "streaming"))]
255impl<B> Body for StreamingBody<B>
256where
257    B: Body + Unpin,
258    B::Error: Into<StreamingError>,
259    B::Data: Into<Bytes>,
260{
261    type Data = Bytes;
262    type Error = StreamingError;
263
264    fn poll_frame(
265        mut self: Pin<&mut Self>,
266        cx: &mut Context<'_>,
267    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
268        match self.as_mut().project() {
269            StreamingBodyProj::Buffered { data } => {
270                if let Some(bytes) = data.take() {
271                    if bytes.is_empty() {
272                        Poll::Ready(None)
273                    } else {
274                        Poll::Ready(Some(Ok(Frame::data(bytes))))
275                    }
276                } else {
277                    Poll::Ready(None)
278                }
279            }
280            StreamingBodyProj::Streaming { inner } => {
281                inner.poll_frame(cx).map(|opt| {
282                    opt.map(|res| {
283                        res.map(|frame| frame.map_data(Into::into))
284                            .map_err(Into::into)
285                    })
286                })
287            }
288        }
289    }
290
291    fn is_end_stream(&self) -> bool {
292        match self {
293            StreamingBody::Buffered { data } => data.is_none(),
294            StreamingBody::Streaming { inner } => inner.is_end_stream(),
295        }
296    }
297
298    fn size_hint(&self) -> http_body::SizeHint {
299        match self {
300            StreamingBody::Buffered { data } => {
301                if let Some(bytes) = data {
302                    let len = bytes.len() as u64;
303                    http_body::SizeHint::with_exact(len)
304                } else {
305                    http_body::SizeHint::with_exact(0)
306                }
307            }
308            StreamingBody::Streaming { inner } => inner.size_hint(),
309        }
310    }
311}
312
313impl<B> From<Bytes> for StreamingBody<B> {
314    fn from(bytes: Bytes) -> Self {
315        Self::buffered(bytes)
316    }
317}
318
319#[cfg(feature = "streaming")]
320impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322        match self {
323            Self::Buffered { data } => f
324                .debug_struct("StreamingBody::Buffered")
325                .field("has_data", &data.is_some())
326                .field("len", &data.as_ref().map(|b| b.len()))
327                .finish(),
328            Self::Streaming { inner } => f
329                .debug_struct("StreamingBody::Streaming")
330                .field("inner", inner)
331                .finish(),
332            Self::File { done, size, .. } => f
333                .debug_struct("StreamingBody::File")
334                .field("done", done)
335                .field("size", &size)
336                .finish_non_exhaustive(),
337        }
338    }
339}
340
341#[cfg(not(feature = "streaming"))]
342impl<B: fmt::Debug> fmt::Debug for StreamingBody<B> {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        match self {
345            Self::Buffered { data } => f
346                .debug_struct("StreamingBody::Buffered")
347                .field("has_data", &data.is_some())
348                .field("len", &data.as_ref().map(|b| b.len()))
349                .finish(),
350            Self::Streaming { inner } => f
351                .debug_struct("StreamingBody::Streaming")
352                .field("inner", inner)
353                .finish(),
354        }
355    }
356}
357
358#[cfg(feature = "streaming")]
359impl<B> StreamingBody<B>
360where
361    B: Body + Unpin + Send,
362    B::Error: Into<StreamingError>,
363    B::Data: Into<Bytes>,
364{
365    /// Convert this streaming body into a stream of Bytes.
366    ///
367    /// This method allows for streaming without collecting the entire body into memory first.
368    pub fn into_bytes_stream(
369        self,
370    ) -> impl futures_util::Stream<
371        Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
372    > + Send {
373        use futures_util::TryStreamExt;
374
375        http_body_util::BodyStream::new(self)
376            .map_ok(|frame| {
377                // Extract data from frame, StreamingBody always produces Bytes
378                frame.into_data().unwrap_or_else(|_| Bytes::new())
379            })
380            .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
381                Box::new(std::io::Error::other(format!("Stream error: {e}")))
382            })
383    }
384}