http_cache/body.rs
1//! HTTP body types for streaming cache support.
2//!
3//! This module provides the [`StreamingBody`] type which allows HTTP cache middleware
4//! to handle both cached (buffered) responses and streaming responses from upstream
5//! servers without requiring full buffering of large responses.
6//! This implementation provides efficient streaming capabilities for HTTP caching.
7
8#![allow(missing_docs)]
9
10use std::{
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15use bytes::Bytes;
16use http_body::{Body, Frame};
17use pin_project_lite::pin_project;
18
19use crate::error::StreamingError;
20
21#[cfg(feature = "streaming")]
22pin_project! {
23 /// A body type that can represent either buffered data from cache, streaming body from upstream,
24 /// or streaming from a file for file-based caching.
25 ///
26 /// This enum allows the HTTP cache middleware to efficiently handle:
27 /// - Cached responses (buffered data)
28 /// - Cache misses (streaming from upstream)
29 /// - File-based cached responses (streaming from disk)
30 ///
31 /// # Variants
32 ///
33 /// - [`Buffered`](StreamingBody::Buffered): Contains cached response data that can be sent immediately
34 /// - [`Streaming`](StreamingBody::Streaming): Wraps an upstream body for streaming responses
35 /// - [`File`](StreamingBody::File): Streams directly from a file for zero-copy caching
36 ///
37 /// # Example
38 ///
39 /// ```rust
40 /// use http_cache::StreamingBody;
41 /// use bytes::Bytes;
42 /// use http_body_util::Full;
43 ///
44 /// // Cached response - sent immediately from memory
45 /// let cached: StreamingBody<Full<Bytes>> = StreamingBody::buffered(Bytes::from("Hello from cache!"));
46 ///
47 /// // Streaming response - passed through from upstream
48 /// # struct MyBody;
49 /// # impl http_body::Body for MyBody {
50 /// # type Data = bytes::Bytes;
51 /// # type Error = Box<dyn std::error::Error + Send + Sync>;
52 /// # fn poll_frame(
53 /// # self: std::pin::Pin<&mut Self>,
54 /// # _: &mut std::task::Context<'_>
55 /// # ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
56 /// # std::task::Poll::Ready(None)
57 /// # }
58 /// # }
59 /// let upstream_body = MyBody;
60 /// let streaming = StreamingBody::streaming(upstream_body);
61 /// ```
62 #[project = StreamingBodyProj]
63 pub enum StreamingBody<B> {
64 Buffered {
65 data: Option<Bytes>,
66 },
67 Streaming {
68 #[pin]
69 inner: B,
70 },
71 File {
72 #[pin]
73 file: crate::runtime::File,
74 buf: Vec<u8>,
75 finished: bool,
76 },
77 }
78}
79
80#[cfg(not(feature = "streaming"))]
81pin_project! {
82 /// A body type that can represent either buffered data from cache or streaming body from upstream.
83 ///
84 /// This enum allows the HTTP cache middleware to efficiently handle:
85 /// - Cached responses (buffered data)
86 /// - Cache misses (streaming from upstream)
87 ///
88 /// # Variants
89 ///
90 /// - [`Buffered`](StreamingBody::Buffered): Contains cached response data that can be sent immediately
91 /// - [`Streaming`](StreamingBody::Streaming): Wraps an upstream body for streaming responses
92 ///
93 /// # Example
94 ///
95 /// ```rust
96 /// use http_cache::StreamingBody;
97 /// use bytes::Bytes;
98 /// use http_body_util::Full;
99 ///
100 /// // Cached response - sent immediately from memory
101 /// let cached: StreamingBody<Full<Bytes>> = StreamingBody::buffered(Bytes::from("Hello from cache!"));
102 ///
103 /// // Streaming response - passed through from upstream
104 /// # struct MyBody;
105 /// # impl http_body::Body for MyBody {
106 /// # type Data = bytes::Bytes;
107 /// # type Error = Box<dyn std::error::Error + Send + Sync>;
108 /// # fn poll_frame(
109 /// # self: std::pin::Pin<&mut Self>,
110 /// # _: &mut std::task::Context<'_>
111 /// # ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
112 /// # std::task::Poll::Ready(None)
113 /// # }
114 /// # }
115 /// let upstream_body = MyBody;
116 /// let streaming = StreamingBody::streaming(upstream_body);
117 /// ```
118 #[project = StreamingBodyProj]
119 pub enum StreamingBody<B> {
120 Buffered {
121 data: Option<Bytes>,
122 },
123 Streaming {
124 #[pin]
125 inner: B,
126 },
127 }
128}
129
130impl<B> StreamingBody<B> {
131 /// Create a new buffered body from bytes
132 pub fn buffered(data: Bytes) -> Self {
133 Self::Buffered { data: Some(data) }
134 }
135
136 /// Create a new streaming body from an upstream body
137 pub fn streaming(body: B) -> Self {
138 Self::Streaming { inner: body }
139 }
140
141 /// Create a new file-based streaming body
142 #[cfg(feature = "streaming")]
143 pub fn from_file(file: crate::runtime::File) -> Self {
144 Self::File { file, buf: Vec::new(), finished: false }
145 }
146}
147
148impl<B> Body for StreamingBody<B>
149where
150 B: Body + Unpin,
151 B::Error: Into<StreamingError>,
152 B::Data: Into<Bytes>,
153{
154 type Data = Bytes;
155 type Error = StreamingError;
156
157 fn poll_frame(
158 mut self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
161 match self.as_mut().project() {
162 StreamingBodyProj::Buffered { data } => {
163 if let Some(bytes) = data.take() {
164 if bytes.is_empty() {
165 Poll::Ready(None)
166 } else {
167 Poll::Ready(Some(Ok(Frame::data(bytes))))
168 }
169 } else {
170 Poll::Ready(None)
171 }
172 }
173 StreamingBodyProj::Streaming { inner } => {
174 inner.poll_frame(cx).map(|opt| {
175 opt.map(|res| {
176 res.map(|frame| frame.map_data(Into::into))
177 .map_err(Into::into)
178 })
179 })
180 }
181 #[cfg(feature = "streaming")]
182 StreamingBodyProj::File { file, buf, finished } => {
183 if *finished {
184 return Poll::Ready(None);
185 }
186
187 // Prepare buffer
188 buf.resize(8192, 0);
189
190 cfg_if::cfg_if! {
191 if #[cfg(feature = "streaming-tokio")] {
192 use tokio::io::AsyncRead;
193 use crate::runtime::ReadBuf;
194
195 let mut read_buf = ReadBuf::new(buf);
196 match file.poll_read(cx, &mut read_buf) {
197 Poll::Pending => Poll::Pending,
198 Poll::Ready(Err(e)) => {
199 *finished = true;
200 Poll::Ready(Some(Err(StreamingError::new(e))))
201 }
202 Poll::Ready(Ok(())) => {
203 let n = read_buf.filled().len();
204 if n == 0 {
205 // EOF
206 *finished = true;
207 Poll::Ready(None)
208 } else {
209 let chunk = Bytes::copy_from_slice(&buf[..n]);
210 buf.clear();
211 Poll::Ready(Some(Ok(Frame::data(chunk))))
212 }
213 }
214 }
215 } else if #[cfg(feature = "streaming-smol")] {
216 use futures::io::AsyncRead;
217
218 match file.poll_read(cx, buf) {
219 Poll::Pending => Poll::Pending,
220 Poll::Ready(Err(e)) => {
221 *finished = true;
222 Poll::Ready(Some(Err(StreamingError::new(e))))
223 }
224 Poll::Ready(Ok(0)) => {
225 // EOF
226 *finished = true;
227 Poll::Ready(None)
228 }
229 Poll::Ready(Ok(n)) => {
230 let chunk = Bytes::copy_from_slice(&buf[..n]);
231 buf.clear();
232 Poll::Ready(Some(Ok(Frame::data(chunk))))
233 }
234 }
235 }
236 }
237 }
238 }
239 }
240
241 fn is_end_stream(&self) -> bool {
242 match self {
243 StreamingBody::Buffered { data } => data.is_none(),
244 StreamingBody::Streaming { inner } => inner.is_end_stream(),
245 #[cfg(feature = "streaming")]
246 StreamingBody::File { finished, .. } => *finished,
247 }
248 }
249
250 fn size_hint(&self) -> http_body::SizeHint {
251 match self {
252 StreamingBody::Buffered { data } => {
253 if let Some(bytes) = data {
254 let len = bytes.len() as u64;
255 http_body::SizeHint::with_exact(len)
256 } else {
257 http_body::SizeHint::with_exact(0)
258 }
259 }
260 StreamingBody::Streaming { inner } => inner.size_hint(),
261 #[cfg(feature = "streaming")]
262 StreamingBody::File { .. } => {
263 // We don't know the file size in advance without an additional stat call
264 http_body::SizeHint::default()
265 }
266 }
267 }
268}
269
270impl<B> From<Bytes> for StreamingBody<B> {
271 fn from(bytes: Bytes) -> Self {
272 Self::buffered(bytes)
273 }
274}
275
276#[cfg(feature = "streaming")]
277impl<B> StreamingBody<B>
278where
279 B: Body + Unpin + Send,
280 B::Error: Into<StreamingError>,
281 B::Data: Into<Bytes>,
282{
283 /// Convert this streaming body into a stream of Bytes.
284 ///
285 /// This method allows for streaming without collecting the entire body into memory first.
286 /// This is particularly useful for file-based cached responses which can stream
287 /// directly from disk.
288 pub fn into_bytes_stream(
289 self,
290 ) -> impl futures_util::Stream<
291 Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>,
292 > + Send {
293 use futures_util::TryStreamExt;
294
295 http_body_util::BodyStream::new(self)
296 .map_ok(|frame| {
297 // Extract data from frame, StreamingBody always produces Bytes
298 frame.into_data().unwrap_or_else(|_| Bytes::new())
299 })
300 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> {
301 Box::new(std::io::Error::other(format!("Stream error: {e}")))
302 })
303 }
304}