1use futures_lite::{io::Cursor, ready, AsyncRead, AsyncReadExt};
2use std::{
3 borrow::Cow,
4 fmt::Debug,
5 io::{Error, ErrorKind, Result},
6 pin::Pin,
7 task::{Context, Poll},
8};
9use BodyType::{Empty, Static, Streaming};
10
11#[derive(Debug, Default)]
15pub struct Body(BodyType);
16
17impl Body {
18 pub fn new_streaming(
22 async_read: impl AsyncRead + Send + Sync + 'static,
23 len: Option<u64>,
24 ) -> Self {
25 Self(Streaming {
26 async_read: Box::pin(async_read),
27 len,
28 done: false,
29 progress: 0,
30 })
31 }
32
33 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
36 Self(Static {
37 content: content.into(),
38 cursor: 0,
39 })
40 }
41
42 pub fn static_bytes(&self) -> Option<&[u8]> {
46 match &self.0 {
47 Static { content, .. } => Some(content.as_ref()),
48 _ => None,
49 }
50 }
51
52 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync>> {
57 match self.0 {
58 Streaming { async_read, .. } => async_read,
59 Static { content, .. } => Box::pin(Cursor::new(content)),
60 Empty => Box::pin(Cursor::new("")),
61 }
62 }
63
64 #[allow(clippy::missing_errors_doc)] pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
81 match self.0 {
82 Static { content, .. } => Ok(content),
83
84 Streaming {
85 mut async_read,
86 len,
87 progress: 0,
88 done: false,
89 } => {
90 let mut buf = len
91 .and_then(|c| c.try_into().ok())
92 .map(Vec::with_capacity)
93 .unwrap_or_default();
94
95 async_read.read_to_end(&mut buf).await?;
96
97 Ok(Cow::Owned(buf))
98 }
99
100 Empty => Ok(Cow::Borrowed(b"")),
101
102 Streaming { .. } => Err(Error::new(
103 ErrorKind::Other,
104 "body already read to completion",
105 )),
106 }
107 }
108
109 pub fn bytes_read(&self) -> u64 {
112 self.0.bytes_read()
113 }
114
115 pub fn len(&self) -> Option<u64> {
118 self.0.len()
119 }
120
121 pub fn is_empty(&self) -> bool {
123 self.0.is_empty()
124 }
125
126 pub fn is_static(&self) -> bool {
128 matches!(self.0, Static { .. })
129 }
130
131 pub fn is_streaming(&self) -> bool {
133 matches!(self.0, Streaming { .. })
134 }
135}
136
137#[allow(
138 clippy::cast_sign_loss,
139 clippy::cast_possible_truncation,
140 clippy::cast_precision_loss
141)]
142fn max_bytes_to_read(buf_len: usize) -> usize {
143 assert!(
144 buf_len >= 6,
145 "buffers of length {buf_len} are too small for this implementation.
146 if this is a problem for you, please open an issue"
147 );
148
149 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
154
155 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
162
163 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
168}
169
170impl AsyncRead for Body {
171 fn poll_read(
172 mut self: Pin<&mut Self>,
173 cx: &mut Context<'_>,
174 buf: &mut [u8],
175 ) -> Poll<Result<usize>> {
176 match &mut self.0 {
177 Empty => Poll::Ready(Ok(0)),
178 Static { content, cursor } => {
179 let length = content.len();
180 if length == *cursor {
181 return Poll::Ready(Ok(0));
182 }
183 let bytes = (length - *cursor).min(buf.len());
184 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
185 *cursor += bytes;
186 Poll::Ready(Ok(bytes))
187 }
188
189 Streaming {
190 async_read,
191 len: Some(len),
192 done,
193 progress,
194 } => {
195 if *done {
196 return Poll::Ready(Ok(0));
197 }
198
199 let max_bytes_to_read = (*len - *progress)
200 .try_into()
201 .unwrap_or(buf.len())
202 .min(buf.len());
203
204 let bytes = ready!(async_read
205 .as_mut()
206 .poll_read(cx, &mut buf[..max_bytes_to_read]))?;
207
208 if bytes == 0 {
209 *done = true;
210 } else {
211 *progress += bytes as u64;
212 }
213
214 Poll::Ready(Ok(bytes))
215 }
216
217 Streaming {
218 async_read,
219 len: None,
220 done,
221 progress,
222 } => {
223 if *done {
224 return Poll::Ready(Ok(0));
225 }
226
227 let max_bytes_to_read = max_bytes_to_read(buf.len());
228
229 let bytes = ready!(async_read
230 .as_mut()
231 .poll_read(cx, &mut buf[..max_bytes_to_read]))?;
232
233 if bytes == 0 {
234 *done = true;
235 } else {
236 *progress += bytes as u64;
237 }
238
239 let start = format!("{bytes:X}\r\n");
240 let start_length = start.as_bytes().len();
241 let total = bytes + start_length + 2;
242 buf.copy_within(..bytes, start_length);
243 buf[..start_length].copy_from_slice(start.as_bytes());
244 buf[total - 2..total].copy_from_slice(b"\r\n");
245 Poll::Ready(Ok(total))
246 }
247 }
248 }
249}
250
251#[derive(Default)]
252enum BodyType {
253 #[default]
254 Empty,
255
256 Static {
257 content: Cow<'static, [u8]>,
258 cursor: usize,
259 },
260
261 Streaming {
262 async_read: Pin<Box<dyn AsyncRead + Send + Sync + 'static>>,
263 progress: u64,
264 len: Option<u64>,
265 done: bool,
266 },
267}
268
269impl Debug for BodyType {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 match self {
272 Empty => f.debug_tuple("BodyType::Empty").finish(),
273 Static { content, cursor } => f
274 .debug_struct("BodyType::Static")
275 .field("content", &String::from_utf8_lossy(content))
276 .field("cursor", cursor)
277 .finish(),
278 Streaming {
279 len,
280 done,
281 progress,
282 ..
283 } => f
284 .debug_struct("BodyType::Streaming")
285 .field("async_read", &"..")
286 .field("len", &len)
287 .field("done", &done)
288 .field("progress", &progress)
289 .finish(),
290 }
291 }
292}
293
294impl BodyType {
295 fn is_empty(&self) -> bool {
296 match *self {
297 Empty => true,
298 Static { ref content, .. } => content.is_empty(),
299 Streaming { len, .. } => len == Some(0),
300 }
301 }
302
303 fn len(&self) -> Option<u64> {
304 match *self {
305 Empty => Some(0),
306 Static { ref content, .. } => Some(content.len() as u64),
307 Streaming { len, .. } => len,
308 }
309 }
310
311 fn bytes_read(&self) -> u64 {
312 match *self {
313 Empty => 0,
314 Static { cursor, .. } => cursor as u64,
315 Streaming { progress, .. } => progress,
316 }
317 }
318}
319
320impl From<String> for Body {
321 fn from(s: String) -> Self {
322 s.into_bytes().into()
323 }
324}
325
326impl From<&'static str> for Body {
327 fn from(s: &'static str) -> Self {
328 s.as_bytes().into()
329 }
330}
331
332impl From<&'static [u8]> for Body {
333 fn from(content: &'static [u8]) -> Self {
334 Self::new_static(content)
335 }
336}
337
338impl From<Vec<u8>> for Body {
339 fn from(content: Vec<u8>) -> Self {
340 Self::new_static(content)
341 }
342}
343
344impl From<Cow<'static, [u8]>> for Body {
345 fn from(value: Cow<'static, [u8]>) -> Self {
346 Self::new_static(value)
347 }
348}
349
350impl From<Cow<'static, str>> for Body {
351 fn from(value: Cow<'static, str>) -> Self {
352 match value {
353 Cow::Borrowed(b) => b.into(),
354 Cow::Owned(o) => o.into(),
355 }
356 }
357}
358
359#[cfg(test)]
360mod test_bytes_to_read {
361 #[test]
362 fn simple_check_of_known_values() {
363 let values = vec![
372 (6, 1), (7, 2), (20, 15), (21, 15), (22, 16), (23, 17), (260, 254), (261, 254), (262, 255), (263, 256), (4100, 4093), (4101, 4093), (4102, 4094), (4103, 4095), (4104, 4096), ];
388
389 for (input, expected) in values {
390 let actual = super::max_bytes_to_read(input);
391 assert_eq!(
392 actual, expected,
393 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
394 );
395
396 let used_bytes = expected + 4 + format!("{expected:X}").len();
398 assert!(
399 used_bytes == input || used_bytes == input - 1,
400 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
401 input,
402 input,
403 input - 1,
404 used_bytes
405 );
406 }
407 }
408}