http_cache/
body.rs

1//! HTTP body types for streaming cache support.
2//!
3//! This module provides the [`StreamingBody`] type which allows HTTP cache middleware
4//! to handle both cached (buffered) responses and streaming responses from upstream
5//! servers without requiring full buffering of large responses.
6//! This implementation provides efficient streaming capabilities for HTTP caching.
7
8#![allow(missing_docs)]
9
10use std::{
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15use bytes::Bytes;
16use http_body::{Body, Frame};
17use pin_project_lite::pin_project;
18
19use crate::error::StreamingError;
20
21#[cfg(feature = "streaming")]
22pin_project! {
23    /// A body type that can represent either buffered data from cache, streaming body from upstream,
24    /// or streaming from a file for file-based caching.
25    ///
26    /// This enum allows the HTTP cache middleware to efficiently handle:
27    /// - Cached responses (buffered data)
28    /// - Cache misses (streaming from upstream)
29    /// - File-based cached responses (streaming from disk)
30    ///
31    /// # Variants
32    ///
33    /// - [`Buffered`](StreamingBody::Buffered): Contains cached response data that can be sent immediately
34    /// - [`Streaming`](StreamingBody::Streaming): Wraps an upstream body for streaming responses
35    /// - [`File`](StreamingBody::File): Streams directly from a file for zero-copy caching
36    ///
37    /// # Example
38    ///
39    /// ```rust
40    /// use http_cache::StreamingBody;
41    /// use bytes::Bytes;
42    /// use http_body_util::Full;
43    ///
44    /// // Cached response - sent immediately from memory
45    /// let cached: StreamingBody<Full<Bytes>> = StreamingBody::buffered(Bytes::from("Hello from cache!"));
46    ///
47    /// // Streaming response - passed through from upstream
48    /// # struct MyBody;
49    /// # impl http_body::Body for MyBody {
50    /// #     type Data = bytes::Bytes;
51    /// #     type Error = Box<dyn std::error::Error + Send + Sync>;
52    /// #     fn poll_frame(
53    /// #         self: std::pin::Pin<&mut Self>,
54    /// #         _: &mut std::task::Context<'_>
55    /// #     ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
56    /// #         std::task::Poll::Ready(None)
57    /// #     }
58    /// # }
59    /// let upstream_body = MyBody;
60    /// let streaming = StreamingBody::streaming(upstream_body);
61    /// ```
62    #[project = StreamingBodyProj]
63    pub enum StreamingBody<B> {
64        Buffered {
65            data: Option<Bytes>,
66        },
67        Streaming {
68            #[pin]
69            inner: B,
70        },
71        File {
72            #[pin]
73            file: crate::runtime::File,
74            buf: Vec<u8>,
75            finished: bool,
76        },
77    }
78}
79
80#[cfg(not(feature = "streaming"))]
81pin_project! {
82    /// A body type that can represent either buffered data from cache or streaming body from upstream.
83    ///
84    /// This enum allows the HTTP cache middleware to efficiently handle:
85    /// - Cached responses (buffered data)
86    /// - Cache misses (streaming from upstream)
87    ///
88    /// # Variants
89    ///
90    /// - [`Buffered`](StreamingBody::Buffered): Contains cached response data that can be sent immediately
91    /// - [`Streaming`](StreamingBody::Streaming): Wraps an upstream body for streaming responses
92    ///
93    /// # Example
94    ///
95    /// ```rust
96    /// use http_cache::StreamingBody;
97    /// use bytes::Bytes;
98    /// use http_body_util::Full;
99    ///
100    /// // Cached response - sent immediately from memory
101    /// let cached: StreamingBody<Full<Bytes>> = StreamingBody::buffered(Bytes::from("Hello from cache!"));
102    ///
103    /// // Streaming response - passed through from upstream
104    /// # struct MyBody;
105    /// # impl http_body::Body for MyBody {
106    /// #     type Data = bytes::Bytes;
107    /// #     type Error = Box<dyn std::error::Error + Send + Sync>;
108    /// #     fn poll_frame(
109    /// #         self: std::pin::Pin<&mut Self>,
110    /// #         _: &mut std::task::Context<'_>
111    /// #     ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
112    /// #         std::task::Poll::Ready(None)
113    /// #     }
114    /// # }
115    /// let upstream_body = MyBody;
116    /// let streaming = StreamingBody::streaming(upstream_body);
117    /// ```
118    #[project = StreamingBodyProj]
119    pub enum StreamingBody<B> {
120        Buffered {
121            data: Option<Bytes>,
122        },
123        Streaming {
124            #[pin]
125            inner: B,
126        },
127    }
128}
129
130impl<B> StreamingBody<B> {
131    /// Create a new buffered body from bytes
132    pub fn buffered(data: Bytes) -> Self {
133        Self::Buffered { data: Some(data) }
134    }
135
136    /// Create a new streaming body from an upstream body
137    pub fn streaming(body: B) -> Self {
138        Self::Streaming { inner: body }
139    }
140
141    /// Create a new file-based streaming body
142    #[cfg(feature = "streaming")]
143    pub fn from_file(file: crate::runtime::File) -> Self {
144        Self::File { file, buf: Vec::new(), finished: false }
145    }
146}
147
148impl<B> Body for StreamingBody<B>
149where
150    B: Body + Unpin,
151    B::Error: Into<StreamingError>,
152    B::Data: Into<Bytes>,
153{
154    type Data = Bytes;
155    type Error = StreamingError;
156
157    fn poll_frame(
158        mut self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
161        match self.as_mut().project() {
162            StreamingBodyProj::Buffered { data } => {
163                if let Some(bytes) = data.take() {
164                    if bytes.is_empty() {
165                        Poll::Ready(None)
166                    } else {
167                        Poll::Ready(Some(Ok(Frame::data(bytes))))
168                    }
169                } else {
170                    Poll::Ready(None)
171                }
172            }
173            StreamingBodyProj::Streaming { inner } => {
174                inner.poll_frame(cx).map(|opt| {
175                    opt.map(|res| {
176                        res.map(|frame| frame.map_data(Into::into))
177                            .map_err(Into::into)
178                    })
179                })
180            }
181            #[cfg(feature = "streaming")]
182            StreamingBodyProj::File { file, buf, finished } => {
183                if *finished {
184                    return Poll::Ready(None);
185                }
186
187                // Prepare buffer
188                buf.resize(8192, 0);
189
190                cfg_if::cfg_if! {
191                    if #[cfg(feature = "streaming-tokio")] {
192                        use tokio::io::AsyncRead;
193                        use crate::runtime::ReadBuf;
194
195                        let mut read_buf = ReadBuf::new(buf);
196                        match file.poll_read(cx, &mut read_buf) {
197                            Poll::Pending => Poll::Pending,
198                            Poll::Ready(Err(e)) => {
199                                *finished = true;
200                                Poll::Ready(Some(Err(StreamingError::new(e))))
201                            }
202                            Poll::Ready(Ok(())) => {
203                                let n = read_buf.filled().len();
204                                if n == 0 {
205                                    // EOF
206                                    *finished = true;
207                                    Poll::Ready(None)
208                                } else {
209                                    let chunk = Bytes::copy_from_slice(&buf[..n]);
210                                    buf.clear();
211                                    Poll::Ready(Some(Ok(Frame::data(chunk))))
212                                }
213                            }
214                        }
215                    } else if #[cfg(feature = "streaming-smol")] {
216                        use futures::io::AsyncRead;
217
218                        match file.poll_read(cx, buf) {
219                            Poll::Pending => Poll::Pending,
220                            Poll::Ready(Err(e)) => {
221                                *finished = true;
222                                Poll::Ready(Some(Err(StreamingError::new(e))))
223                            }
224                            Poll::Ready(Ok(0)) => {
225                                // EOF
226                                *finished = true;
227                                Poll::Ready(None)
228                            }
229                            Poll::Ready(Ok(n)) => {
230                                let chunk = Bytes::copy_from_slice(&buf[..n]);
231                                buf.clear();
232                                Poll::Ready(Some(Ok(Frame::data(chunk))))
233                            }
234                        }
235                    }
236                }
237            }
238        }
239    }
240
241    fn is_end_stream(&self) -> bool {
242        match self {
243            StreamingBody::Buffered { data } => data.is_none(),
244            StreamingBody::Streaming { inner } => inner.is_end_stream(),
245            #[cfg(feature = "streaming")]
246            StreamingBody::File { finished, .. } => *finished,
247        }
248    }
249
250    fn size_hint(&self) -> http_body::SizeHint {
251        match self {
252            StreamingBody::Buffered { data } => {
253                if let Some(bytes) = data {
254                    let len = bytes.len() as u64;
255                    http_body::SizeHint::with_exact(len)
256                } else {
257                    http_body::SizeHint::with_exact(0)
258                }
259            }
260            StreamingBody::Streaming { inner } => inner.size_hint(),
261            #[cfg(feature = "streaming")]
262            StreamingBody::File { .. } => {
263                // We don't know the file size in advance without an additional stat call
264                http_body::SizeHint::default()
265            }
266        }
267    }
268}
269
270impl<B> From<Bytes> for StreamingBody<B> {
271    fn from(bytes: Bytes) -> Self {
272        Self::buffered(bytes)
273    }
274}
275
276#[cfg(feature = "streaming")]
277impl<B> StreamingBody<B>
278where
279    B: Body + Unpin + Send,
280    B::Error: Into<StreamingError>,
281    B::Data: Into<Bytes>,
282{
283    /// Convert this streaming body into a stream of Bytes.
284    ///
285    /// This method allows for streaming without collecting the entire body into memory first.
286    /// This is particularly useful for file-based cached responses which can stream
287    /// directly from disk.
288    pub fn into_bytes_stream(
289        self,
290    ) -> impl futures_util::Stream<
291        Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
292    > + Send {
293        use futures_util::TryStreamExt;
294
295        http_body_util::BodyStream::new(self)
296            .map_ok(|frame| {
297                // Extract data from frame, StreamingBody always produces Bytes
298                frame.into_data().unwrap_or_else(|_| Bytes::new())
299            })
300            .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
301                Box::new(std::io::Error::other(format!("Stream error: {e}")))
302            })
303    }
304}