1use crate::error::{BoxError, Error};
12use bytes::Bytes;
13
14#[doc(hidden)]
16pub use http_body::Body as HttpBody;
17
18pub type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
27
28pub type BoxBodySync = http_body_util::combinators::BoxBody<Bytes, Error>;
33
34pub fn boxed<B>(body: B) -> BoxBody
41where
42 B: http_body::Body<Data = Bytes> + Send + 'static,
43 B::Error: Into<BoxError>,
44{
45 use http_body_util::BodyExt;
46
47 try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
48}
49
50pub fn boxed_sync<B>(body: B) -> BoxBodySync
52where
53 B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
54 B::Error: Into<BoxError>,
55{
56 use http_body_util::BodyExt;
57 body.map_err(Error::new).boxed()
58}
59
60#[doc(hidden)]
61pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
62where
63 T: 'static,
64 K: Send + 'static,
65{
66 let mut k = Some(k);
67 if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
68 Ok(k.take().unwrap())
69 } else {
70 Err(k.unwrap())
71 }
72}
73
74pub fn empty() -> BoxBody {
76 boxed(http_body_util::Empty::<Bytes>::new())
77}
78
79pub fn empty_sync() -> BoxBodySync {
81 boxed_sync(http_body_util::Empty::<Bytes>::new())
82}
83
84#[doc(hidden)]
88pub fn to_boxed<B>(body: B) -> BoxBody
89where
90 B: Into<Bytes>,
91{
92 boxed(http_body_util::Full::new(body.into()))
93}
94
95#[doc(hidden)]
99pub fn to_boxed_sync<B>(body: B) -> BoxBodySync
100where
101 B: Into<Bytes>,
102{
103 boxed_sync(http_body_util::Full::new(body.into()))
104}
105
106pub fn from_bytes(bytes: Bytes) -> BoxBody {
108 boxed(http_body_util::Full::new(bytes))
109}
110
111pub fn wrap_stream<S, O, E>(stream: S) -> BoxBody
127where
128 S: futures_util::Stream<Item = Result<O, E>> + Send + 'static,
129 O: Into<Bytes> + 'static,
130 E: Into<BoxError> + 'static,
131{
132 use futures_util::TryStreamExt;
133 use http_body_util::StreamBody;
134
135 let frame_stream = stream
137 .map_ok(|chunk| http_body::Frame::data(chunk.into()))
138 .map_err(|e| Error::new(e.into()));
139
140 boxed(StreamBody::new(frame_stream))
141}
142
143pub fn wrap_stream_sync<S, O, E>(stream: S) -> BoxBodySync
151where
152 S: futures_util::Stream<Item = Result<O, E>> + Send + Sync + 'static,
153 O: Into<Bytes> + 'static,
154 E: Into<BoxError> + 'static,
155{
156 use futures_util::TryStreamExt;
157 use http_body_util::StreamBody;
158
159 let frame_stream = stream
161 .map_ok(|chunk| http_body::Frame::data(chunk.into()))
162 .map_err(|e| Error::new(e.into()));
163
164 boxed_sync(StreamBody::new(frame_stream))
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 async fn collect_bytes<B>(body: B) -> Result<Bytes, Error>
176 where
177 B: HttpBody,
178 B::Error: Into<BoxError>,
179 {
180 use http_body_util::BodyExt;
181
182 let collected = body.collect().await.map_err(Error::new)?;
183 Ok(collected.to_bytes())
184 }
185
186 #[tokio::test]
187 async fn test_empty_body() {
188 let body = empty();
189 let bytes = collect_bytes(body).await.unwrap();
190 assert_eq!(bytes.len(), 0);
191 }
192
193 #[tokio::test]
194 async fn test_from_bytes() {
195 let data = Bytes::from("hello world");
196 let body = from_bytes(data.clone());
197 let collected = collect_bytes(body).await.unwrap();
198 assert_eq!(collected, data);
199 }
200
201 #[tokio::test]
202 async fn test_to_boxed_string() {
203 let s = "hello world";
204 let body = to_boxed(s);
205 let collected = collect_bytes(body).await.unwrap();
206 assert_eq!(collected, Bytes::from(s));
207 }
208
209 #[tokio::test]
210 async fn test_to_boxed_vec() {
211 let vec = vec![1u8, 2, 3, 4, 5];
212 let body = to_boxed(vec.clone());
213 let collected = collect_bytes(body).await.unwrap();
214 assert_eq!(collected.as_ref(), vec.as_slice());
215 }
216
217 #[tokio::test]
218 async fn test_boxed() {
219 use http_body_util::Full;
220 let full_body = Full::new(Bytes::from("test data"));
221 let boxed_body: BoxBody = boxed(full_body);
222 let collected = collect_bytes(boxed_body).await.unwrap();
223 assert_eq!(collected, Bytes::from("test data"));
224 }
225
226 #[tokio::test]
227 async fn test_boxed_sync() {
228 use http_body_util::Full;
229 let full_body = Full::new(Bytes::from("sync test"));
230 let boxed_body: BoxBodySync = boxed_sync(full_body);
231 let collected = collect_bytes(boxed_body).await.unwrap();
232 assert_eq!(collected, Bytes::from("sync test"));
233 }
234
235 #[tokio::test]
236 async fn test_wrap_stream_single_chunk() {
237 use futures_util::stream;
238
239 let data = Bytes::from("single chunk");
240 let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
241
242 let body = wrap_stream(stream);
243 let collected = collect_bytes(body).await.unwrap();
244 assert_eq!(collected, data);
245 }
246
247 #[tokio::test]
248 async fn test_wrap_stream_multiple_chunks() {
249 use futures_util::stream;
250
251 let chunks = vec![
252 Ok::<_, std::io::Error>(Bytes::from("chunk1")),
253 Ok(Bytes::from("chunk2")),
254 Ok(Bytes::from("chunk3")),
255 ];
256 let expected = Bytes::from("chunk1chunk2chunk3");
257
258 let stream = stream::iter(chunks);
259 let body = wrap_stream(stream);
260 let collected = collect_bytes(body).await.unwrap();
261 assert_eq!(collected, expected);
262 }
263
264 #[tokio::test]
265 async fn test_wrap_stream_empty() {
266 use futures_util::stream;
267
268 let stream = stream::iter(vec![Ok::<_, std::io::Error>(Bytes::new())]);
269
270 let body = wrap_stream(stream);
271 let collected = collect_bytes(body).await.unwrap();
272 assert_eq!(collected.len(), 0);
273 }
274
275 #[tokio::test]
276 async fn test_wrap_stream_error() {
277 use futures_util::stream;
278
279 let chunks = vec![
280 Ok::<_, std::io::Error>(Bytes::from("chunk1")),
281 Err(std::io::Error::other("test error")),
282 ];
283
284 let stream = stream::iter(chunks);
285 let body = wrap_stream(stream);
286 let result = collect_bytes(body).await;
287 assert!(result.is_err());
288 }
289
290 #[tokio::test]
291 async fn test_wrap_stream_various_types() {
292 use futures_util::stream;
293
294 let chunks = vec![Ok::<_, std::io::Error>("string slice"), Ok("another string")];
298 let stream = stream::iter(chunks);
299 let body = wrap_stream(stream);
300 let collected = collect_bytes(body).await.unwrap();
301 assert_eq!(collected, Bytes::from("string sliceanother string"));
302
303 let chunks = vec![
305 Ok::<_, std::io::Error>(String::from("owned ")),
306 Ok(String::from("strings")),
307 ];
308 let stream = stream::iter(chunks);
309 let body = wrap_stream(stream);
310 let collected = collect_bytes(body).await.unwrap();
311 assert_eq!(collected, Bytes::from("owned strings"));
312
313 let chunks = vec![
315 Ok::<_, std::io::Error>(vec![72u8, 101, 108, 108, 111]), Ok(vec![32u8, 87, 111, 114, 108, 100]), ];
318 let stream = stream::iter(chunks);
319 let body = wrap_stream(stream);
320 let collected = collect_bytes(body).await.unwrap();
321 assert_eq!(collected, Bytes::from("Hello World"));
322
323 let chunks = vec![
325 Ok::<_, std::io::Error>(&[98u8, 121, 116, 101] as &[u8]), Ok(&[115u8, 33] as &[u8]), ];
328 let stream = stream::iter(chunks);
329 let body = wrap_stream(stream);
330 let collected = collect_bytes(body).await.unwrap();
331 assert_eq!(collected, Bytes::from("bytes!"));
332
333 struct CustomChunk {
335 data: String,
336 }
337
338 impl From<CustomChunk> for Bytes {
339 fn from(chunk: CustomChunk) -> Bytes {
340 Bytes::from(chunk.data)
341 }
342 }
343
344 let chunks = vec![
345 Ok::<_, std::io::Error>(CustomChunk { data: "custom ".into() }),
346 Ok(CustomChunk { data: "struct".into() }),
347 ];
348 let stream = stream::iter(chunks);
349 let body = wrap_stream(stream);
350 let collected = collect_bytes(body).await.unwrap();
351 assert_eq!(collected, Bytes::from("custom struct"));
352 }
353
354 #[tokio::test]
355 async fn test_wrap_stream_custom_stream_type() {
356 use bytes::Bytes;
357 use std::pin::Pin;
358 use std::task::{Context, Poll};
359
360 struct CustomStream {
362 chunks: Vec<Result<Bytes, std::io::Error>>,
363 }
364
365 impl CustomStream {
366 fn new(chunks: Vec<Result<Bytes, std::io::Error>>) -> Self {
367 Self { chunks }
368 }
369 }
370
371 impl futures_util::Stream for CustomStream {
372 type Item = Result<Bytes, std::io::Error>;
373
374 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
375 if self.chunks.is_empty() {
376 Poll::Ready(None)
377 } else {
378 Poll::Ready(Some(self.chunks.remove(0)))
379 }
380 }
381 }
382
383 let stream = CustomStream::new(vec![Ok(Bytes::from("custom ")), Ok(Bytes::from("stream"))]);
384
385 let body = wrap_stream(stream);
386 let collected = collect_bytes(body).await.unwrap();
387 assert_eq!(collected, Bytes::from("custom stream"));
388 }
389
390 #[tokio::test]
391 async fn test_wrap_stream_custom_error_type() {
392 use bytes::Bytes;
393 use futures_util::stream;
394
395 #[derive(Debug, Clone)]
397 struct CustomError {
398 message: String,
399 }
400
401 impl std::fmt::Display for CustomError {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 write!(f, "CustomError: {}", self.message)
404 }
405 }
406
407 impl std::error::Error for CustomError {}
408
409 let chunks = vec![
411 Ok::<_, CustomError>(Bytes::from("custom ")),
412 Ok(Bytes::from("error type")),
413 ];
414 let stream = stream::iter(chunks);
415 let body = wrap_stream(stream);
416 let collected = collect_bytes(body).await.unwrap();
417 assert_eq!(collected, Bytes::from("custom error type"));
418
419 let chunks = vec![
421 Ok::<_, CustomError>(Bytes::from("data")),
422 Err(CustomError {
423 message: "custom error".into(),
424 }),
425 ];
426 let stream = stream::iter(chunks);
427 let body = wrap_stream(stream);
428 let result = collect_bytes(body).await;
429 assert!(result.is_err());
430 }
431
432 #[tokio::test]
433 async fn test_wrap_stream_incremental_consumption() {
434 use bytes::Bytes;
435 use http_body_util::BodyExt;
436 use std::pin::Pin;
437 use std::task::{Context, Poll};
438
439 struct IncrementalStream {
440 chunks: Vec<Result<Bytes, std::io::Error>>,
441 }
442
443 impl IncrementalStream {
444 fn new(chunks: Vec<Result<Bytes, std::io::Error>>) -> Self {
445 Self { chunks }
446 }
447 }
448
449 impl futures_util::Stream for IncrementalStream {
450 type Item = Result<Bytes, std::io::Error>;
451
452 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
453 if self.chunks.is_empty() {
454 Poll::Ready(None)
455 } else {
456 Poll::Ready(Some(self.chunks.remove(0)))
457 }
458 }
459 }
460
461 let stream = IncrementalStream::new(vec![
462 Ok(Bytes::from("chunk1")),
463 Ok(Bytes::from("chunk2")),
464 Ok(Bytes::from("chunk3")),
465 ]);
466
467 let mut body = wrap_stream(stream);
468
469 let frame1 = body.frame().await.unwrap().unwrap();
470 assert!(frame1.is_data());
471 assert_eq!(frame1.into_data().unwrap(), Bytes::from("chunk1"));
472
473 let frame2 = body.frame().await.unwrap().unwrap();
474 assert!(frame2.is_data());
475 assert_eq!(frame2.into_data().unwrap(), Bytes::from("chunk2"));
476
477 let frame3 = body.frame().await.unwrap().unwrap();
478 assert!(frame3.is_data());
479 assert_eq!(frame3.into_data().unwrap(), Bytes::from("chunk3"));
480
481 let frame4 = body.frame().await;
482 assert!(frame4.is_none());
483 }
484
485 #[tokio::test]
486 async fn test_wrap_stream_sync_single_chunk() {
487 use futures_util::stream;
488
489 let data = Bytes::from("sync single chunk");
490 let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
491
492 let body = wrap_stream_sync(stream);
493 let collected = collect_bytes(body).await.unwrap();
494 assert_eq!(collected, data);
495 }
496
497 #[tokio::test]
498 async fn test_wrap_stream_sync_multiple_chunks() {
499 use futures_util::stream;
500
501 let chunks = vec![
502 Ok::<_, std::io::Error>(Bytes::from("sync1")),
503 Ok(Bytes::from("sync2")),
504 Ok(Bytes::from("sync3")),
505 ];
506 let expected = Bytes::from("sync1sync2sync3");
507
508 let stream = stream::iter(chunks);
509 let body = wrap_stream_sync(stream);
510 let collected = collect_bytes(body).await.unwrap();
511 assert_eq!(collected, expected);
512 }
513
514 #[tokio::test]
515 async fn test_empty_sync_body() {
516 let body = empty_sync();
517 let bytes = collect_bytes(body).await.unwrap();
518 assert_eq!(bytes.len(), 0);
519 }
520
521 #[tokio::test]
522 async fn test_to_boxed_sync() {
523 let data = Bytes::from("sync boxed data");
524 let body = to_boxed_sync(data.clone());
525 let collected = collect_bytes(body).await.unwrap();
526 assert_eq!(collected, data);
527 }
528
529 fn _assert_send<T: Send>() {}
532 fn _assert_sync<T: Sync>() {}
533
534 fn _assert_send_sync_bounds() {
535 _assert_send::<BoxBodySync>();
537 _assert_sync::<BoxBodySync>();
538
539 _assert_send::<BoxBody>();
541 }
542
543 #[tokio::test]
544 async fn test_wrap_stream_sync_produces_sync_body() {
545 use futures_util::stream;
546
547 let data = Bytes::from("test sync");
548 let stream = stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
549
550 let body = wrap_stream_sync(stream);
551
552 fn check_sync<T: Sync>(_: &T) {}
554 check_sync(&body);
555
556 let collected = collect_bytes(body).await.unwrap();
557 assert_eq!(collected, data);
558 }
559
560 #[test]
561 fn test_empty_sync_is_sync() {
562 let body = empty_sync();
563 fn check_sync<T: Sync>(_: &T) {}
564 check_sync(&body);
565 }
566
567 #[test]
568 fn test_boxed_sync_is_sync() {
569 use http_body_util::Full;
570 let body = boxed_sync(Full::new(Bytes::from("test")));
571 fn check_sync<T: Sync>(_: &T) {}
572 check_sync(&body);
573 }
574}