1use std::{error::Error, fmt, marker, mem, pin::Pin, rc::Rc, task::Context, task::Poll};
3
4use futures_core::Stream;
5use ntex_bytes::{BytePages, Bytes, BytesMut};
6
7#[derive(Debug, PartialEq, Eq, Copy, Clone)]
8pub enum BodySize {
10 None,
11 Empty,
12 Sized(u64),
13 Stream,
14}
15
16impl BodySize {
17 pub fn is_eof(&self) -> bool {
18 matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
19 }
20}
21
22pub trait MessageBody: 'static {
24 fn size(&self) -> BodySize;
26
27 fn poll_next_chunk(
28 &mut self,
29 cx: &mut Context<'_>,
30 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>>;
31}
32
33impl MessageBody for () {
34 #[inline]
35 fn size(&self) -> BodySize {
36 BodySize::Empty
37 }
38
39 #[inline]
40 fn poll_next_chunk(
41 &mut self,
42 _: &mut Context<'_>,
43 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
44 Poll::Ready(None)
45 }
46}
47
48impl<T: MessageBody> MessageBody for Box<T> {
49 #[inline]
50 fn size(&self) -> BodySize {
51 self.as_ref().size()
52 }
53
54 #[inline]
55 fn poll_next_chunk(
56 &mut self,
57 cx: &mut Context<'_>,
58 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
59 self.as_mut().poll_next_chunk(cx)
60 }
61}
62
63#[derive(Debug)]
64pub enum ResponseBody<B> {
66 Body(B),
67 Other(Body),
68}
69
70impl ResponseBody<Body> {
71 pub fn into_body<B>(self) -> ResponseBody<B> {
72 match self {
73 ResponseBody::Body(b) | ResponseBody::Other(b) => ResponseBody::Other(b),
74 }
75 }
76}
77
78impl From<ResponseBody<Body>> for Body {
79 fn from(b: ResponseBody<Body>) -> Self {
80 match b {
81 ResponseBody::Body(b) | ResponseBody::Other(b) => b,
82 }
83 }
84}
85
86impl<B> From<Body> for ResponseBody<B> {
87 fn from(b: Body) -> Self {
88 ResponseBody::Other(b)
89 }
90}
91
92impl<B> ResponseBody<B> {
93 #[inline]
94 pub fn new(body: B) -> Self {
95 ResponseBody::Body(body)
96 }
97
98 #[inline]
99 #[must_use]
100 pub fn take_body(&mut self) -> ResponseBody<B> {
101 std::mem::replace(self, ResponseBody::Other(Body::None))
102 }
103}
104
105impl<B: MessageBody> ResponseBody<B> {
106 pub fn as_ref(&self) -> Option<&B> {
107 if let ResponseBody::Body(b) = self {
108 Some(b)
109 } else {
110 None
111 }
112 }
113}
114
115impl<B: MessageBody> MessageBody for ResponseBody<B> {
116 #[inline]
117 fn size(&self) -> BodySize {
118 match self {
119 ResponseBody::Body(body) => body.size(),
120 ResponseBody::Other(body) => body.size(),
121 }
122 }
123
124 #[inline]
125 fn poll_next_chunk(
126 &mut self,
127 cx: &mut Context<'_>,
128 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
129 match self {
130 ResponseBody::Body(body) => body.poll_next_chunk(cx),
131 ResponseBody::Other(body) => body.poll_next_chunk(cx),
132 }
133 }
134}
135
136impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
137 type Item = Result<Bytes, Rc<dyn Error>>;
138
139 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140 match self.get_mut() {
141 ResponseBody::Body(body) => body.poll_next_chunk(cx),
142 ResponseBody::Other(body) => body.poll_next_chunk(cx),
143 }
144 }
145}
146
147pub enum Body {
149 None,
151 Empty,
153 Bytes(Bytes),
155 Message(Box<dyn MessageBody>),
157}
158
159impl Body {
160 pub fn from_slice(s: &[u8]) -> Body {
162 Body::Bytes(Bytes::copy_from_slice(s))
163 }
164
165 pub fn from_message<B: MessageBody>(body: B) -> Body {
167 Body::Message(Box::new(body))
168 }
169}
170
171impl MessageBody for Body {
172 #[inline]
173 fn size(&self) -> BodySize {
174 match self {
175 Body::None => BodySize::None,
176 Body::Empty => BodySize::Empty,
177 Body::Bytes(bin) => BodySize::Sized(bin.len() as u64),
178 Body::Message(body) => body.size(),
179 }
180 }
181
182 fn poll_next_chunk(
183 &mut self,
184 cx: &mut Context<'_>,
185 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
186 match self {
187 Body::None | Body::Empty => Poll::Ready(None),
188 Body::Bytes(bin) => {
189 let len = bin.len();
190 if len == 0 {
191 Poll::Ready(None)
192 } else {
193 Poll::Ready(Some(Ok(mem::take(bin))))
194 }
195 }
196 Body::Message(body) => body.poll_next_chunk(cx),
197 }
198 }
199}
200
201impl PartialEq for Body {
202 fn eq(&self, other: &Body) -> bool {
203 match (self, other) {
204 (Body::None, Body::None) | (Body::Empty, Body::Empty) => true,
205 (Body::Bytes(b), Body::Bytes(b2)) => b == b2,
206 _ => false,
207 }
208 }
209}
210
211impl fmt::Debug for Body {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 match *self {
214 Body::None => write!(f, "Body::None"),
215 Body::Empty => write!(f, "Body::Empty"),
216 Body::Bytes(ref b) => write!(f, "Body::Bytes({b:?})"),
217 Body::Message(_) => write!(f, "Body::Message(_)"),
218 }
219 }
220}
221
222impl From<&'static str> for Body {
223 fn from(s: &'static str) -> Body {
224 Body::Bytes(Bytes::from_static(s.as_ref()))
225 }
226}
227
228impl From<&'static [u8]> for Body {
229 fn from(s: &'static [u8]) -> Body {
230 Body::Bytes(Bytes::from_static(s))
231 }
232}
233
234impl From<Vec<u8>> for Body {
235 fn from(vec: Vec<u8>) -> Body {
236 Body::Bytes(Bytes::from(vec))
237 }
238}
239
240impl From<String> for Body {
241 fn from(s: String) -> Body {
242 s.into_bytes().into()
243 }
244}
245
246impl<'a> From<&'a String> for Body {
247 fn from(s: &'a String) -> Body {
248 Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
249 }
250}
251
252impl From<Bytes> for Body {
253 fn from(s: Bytes) -> Body {
254 Body::Bytes(s)
255 }
256}
257
258impl From<BytesMut> for Body {
259 fn from(s: BytesMut) -> Body {
260 Body::Bytes(s.freeze())
261 }
262}
263
264impl From<BytePages> for Body {
265 fn from(pages: BytePages) -> Body {
266 Body::from_message(pages)
267 }
268}
269
270impl<S> From<SizedStream<S>> for Body
271where
272 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
273{
274 fn from(s: SizedStream<S>) -> Body {
275 Body::from_message(s)
276 }
277}
278
279impl<S, E> From<BodyStream<S, E>> for Body
280where
281 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
282 E: Error + 'static,
283{
284 fn from(s: BodyStream<S, E>) -> Body {
285 Body::from_message(s)
286 }
287}
288
289impl<S> From<BoxedBodyStream<S>> for Body
290where
291 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
292{
293 fn from(s: BoxedBodyStream<S>) -> Body {
294 Body::from_message(s)
295 }
296}
297
298impl MessageBody for Bytes {
299 fn size(&self) -> BodySize {
300 BodySize::Sized(self.len() as u64)
301 }
302
303 fn poll_next_chunk(
304 &mut self,
305 _: &mut Context<'_>,
306 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
307 if self.is_empty() {
308 Poll::Ready(None)
309 } else {
310 Poll::Ready(Some(Ok(mem::take(self))))
311 }
312 }
313}
314
315impl MessageBody for BytesMut {
316 fn size(&self) -> BodySize {
317 BodySize::Sized(self.len() as u64)
318 }
319
320 fn poll_next_chunk(
321 &mut self,
322 _: &mut Context<'_>,
323 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
324 if self.is_empty() {
325 Poll::Ready(None)
326 } else {
327 Poll::Ready(Some(Ok(mem::take(self).freeze())))
328 }
329 }
330}
331
332impl MessageBody for BytePages {
333 fn size(&self) -> BodySize {
334 BodySize::Sized(self.len() as u64)
335 }
336
337 fn poll_next_chunk(
338 &mut self,
339 _: &mut Context<'_>,
340 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
341 if let Some(page) = self.take() {
342 Poll::Ready(Some(Ok(page.freeze())))
343 } else {
344 Poll::Ready(None)
345 }
346 }
347}
348
349impl MessageBody for &'static str {
350 fn size(&self) -> BodySize {
351 BodySize::Sized(self.len() as u64)
352 }
353
354 fn poll_next_chunk(
355 &mut self,
356 _: &mut Context<'_>,
357 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
358 if self.is_empty() {
359 Poll::Ready(None)
360 } else {
361 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
362 }
363 }
364}
365
366impl MessageBody for &'static [u8] {
367 fn size(&self) -> BodySize {
368 BodySize::Sized(self.len() as u64)
369 }
370
371 fn poll_next_chunk(
372 &mut self,
373 _: &mut Context<'_>,
374 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
375 if self.is_empty() {
376 Poll::Ready(None)
377 } else {
378 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
379 }
380 }
381}
382
383impl MessageBody for Vec<u8> {
384 fn size(&self) -> BodySize {
385 BodySize::Sized(self.len() as u64)
386 }
387
388 fn poll_next_chunk(
389 &mut self,
390 _: &mut Context<'_>,
391 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
392 if self.is_empty() {
393 Poll::Ready(None)
394 } else {
395 Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
396 }
397 }
398}
399
400impl MessageBody for String {
401 fn size(&self) -> BodySize {
402 BodySize::Sized(self.len() as u64)
403 }
404
405 fn poll_next_chunk(
406 &mut self,
407 _: &mut Context<'_>,
408 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
409 if self.is_empty() {
410 Poll::Ready(None)
411 } else {
412 Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
413 }
414 }
415}
416
417pub struct BodyStream<S, E> {
421 stream: S,
422 _t: marker::PhantomData<E>,
423}
424
425impl<S, E> BodyStream<S, E>
426where
427 S: Stream<Item = Result<Bytes, E>> + Unpin,
428 E: Error,
429{
430 pub fn new(stream: S) -> Self {
431 BodyStream {
432 stream,
433 _t: marker::PhantomData,
434 }
435 }
436}
437
438impl<S, E> fmt::Debug for BodyStream<S, E> {
439 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440 f.debug_struct("BodyStream")
441 .field("stream", &std::any::type_name::<S>())
442 .field("error", &std::any::type_name::<E>())
443 .finish()
444 }
445}
446
447impl<S, E> MessageBody for BodyStream<S, E>
448where
449 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
450 E: Error + 'static,
451{
452 fn size(&self) -> BodySize {
453 BodySize::Stream
454 }
455
456 fn poll_next_chunk(
462 &mut self,
463 cx: &mut Context<'_>,
464 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
465 loop {
466 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
467 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
468 Poll::Ready(opt) => opt.map(|res| {
469 res.map_err(|e| {
470 let e: Rc<dyn Error> = Rc::new(e);
471 e
472 })
473 }),
474 Poll::Pending => return Poll::Pending,
475 });
476 }
477 }
478}
479
480pub struct BoxedBodyStream<S> {
483 stream: S,
484}
485
486impl<S> BoxedBodyStream<S>
487where
488 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
489{
490 pub fn new(stream: S) -> Self {
491 BoxedBodyStream { stream }
492 }
493}
494
495impl<S> fmt::Debug for BoxedBodyStream<S> {
496 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497 f.debug_struct("BoxedBodyStream")
498 .field("stream", &std::any::type_name::<S>())
499 .finish()
500 }
501}
502
503impl<S> MessageBody for BoxedBodyStream<S>
504where
505 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
506{
507 fn size(&self) -> BodySize {
508 BodySize::Stream
509 }
510
511 fn poll_next_chunk(
517 &mut self,
518 cx: &mut Context<'_>,
519 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
520 loop {
521 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
522 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
523 Poll::Ready(opt) => opt,
524 Poll::Pending => return Poll::Pending,
525 });
526 }
527 }
528}
529
530pub struct SizedStream<S> {
533 size: u64,
534 stream: S,
535}
536
537impl<S> SizedStream<S>
538where
539 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
540{
541 pub fn new(size: u64, stream: S) -> Self {
542 SizedStream { size, stream }
543 }
544}
545
546impl<S> fmt::Debug for SizedStream<S> {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 f.debug_struct("SizedStream")
549 .field("size", &self.size)
550 .field("stream", &std::any::type_name::<S>())
551 .finish()
552 }
553}
554
555impl<S> MessageBody for SizedStream<S>
556where
557 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
558{
559 fn size(&self) -> BodySize {
560 BodySize::Sized(self.size)
561 }
562
563 fn poll_next_chunk(
569 &mut self,
570 cx: &mut Context<'_>,
571 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
572 loop {
573 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
574 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
575 Poll::Ready(val) => val,
576 Poll::Pending => return Poll::Pending,
577 });
578 }
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use std::{future::poll_fn, io};
585
586 use futures_util::stream;
587 use ntex::util::Ready;
588
589 use super::*;
590
591 impl Body {
592 pub(crate) fn get_ref(&self) -> &[u8] {
593 if let Body::Bytes(bin) = self { bin } else { panic!() }
594 }
595 }
596
597 #[ntex::test]
598 async fn test_size() {
599 assert_eq!(32, std::mem::size_of::<Body>());
600 assert_eq!(32, std::mem::size_of::<ResponseBody<Bytes>>());
601 assert_eq!(40, std::mem::size_of::<ResponseBody<Body>>());
602 }
603
604 #[ntex::test]
605 async fn test_static_str() {
606 assert_eq!(Body::from("").size(), BodySize::Sized(0));
607 assert_eq!(Body::from("test").size(), BodySize::Sized(4));
608 assert_eq!(Body::from("test").get_ref(), b"test");
609
610 assert_eq!("test".size(), BodySize::Sized(4));
611 assert_eq!(
612 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
613 Some(Bytes::from("test"))
614 );
615 assert_eq!(
616 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
617 Some(Bytes::from("test"))
618 );
619 assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
620 }
621
622 #[ntex::test]
623 async fn test_pages() {
624 let mut pages = BytePages::default();
625 pages.append(Bytes::from("1111"));
626 pages.append(Bytes::from("2222"));
627 pages.append(Bytes::from("3333"));
628
629 let mut body = Body::from(pages);
630 assert_eq!(body.size(), BodySize::Sized(12));
631 assert_eq!(
632 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
633 Some(Bytes::from("111122223333"))
634 );
635 assert!(poll_fn(|cx| body.poll_next_chunk(cx)).await.is_none());
636 }
637
638 #[ntex::test]
639 async fn test_static_bytes() {
640 assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
641 assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
642 assert_eq!(
643 Body::from_slice(b"test".as_ref()).size(),
644 BodySize::Sized(4)
645 );
646 assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
647
648 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
649 assert_eq!(
650 poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
651 .await
652 .unwrap()
653 .ok(),
654 Some(Bytes::from("test"))
655 );
656 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
657 assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
658 }
659
660 #[ntex::test]
661 async fn test_vec() {
662 assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
663 assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
664
665 assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
666 assert_eq!(
667 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
668 .await
669 .unwrap()
670 .ok(),
671 Some(Bytes::from("test"))
672 );
673 assert_eq!(
674 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
675 .await
676 .unwrap()
677 .ok(),
678 Some(Bytes::from("test"))
679 );
680 assert!(
681 poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
682 .await
683 .is_none()
684 );
685 }
686
687 #[ntex::test]
688 async fn test_bytes() {
689 let mut b = Bytes::from("test");
690 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
691 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
692
693 assert_eq!(b.size(), BodySize::Sized(4));
694 assert_eq!(
695 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
696 Some(Bytes::from("test"))
697 );
698 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
699 }
700
701 #[ntex::test]
702 async fn test_bytes_mut() {
703 let mut b = Body::from(BytesMut::from("test"));
704 assert_eq!(b.size(), BodySize::Sized(4));
705 assert_eq!(b.get_ref(), b"test");
706 assert_eq!(
707 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
708 Some(Bytes::from("test"))
709 );
710 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
711
712 let mut b = BytesMut::from("test");
713 assert_eq!(b.size(), BodySize::Sized(4));
714 assert_eq!(
715 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
716 Some(Bytes::from("test"))
717 );
718 assert_eq!(b.size(), BodySize::Sized(0));
719 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
720 }
721
722 #[ntex::test]
723 async fn test_string() {
724 let mut b = "test".to_owned();
725 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
726 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
727 assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
728 assert_eq!(Body::from(&b).get_ref(), b"test");
729
730 assert_eq!(b.size(), BodySize::Sized(4));
731 assert_eq!(
732 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
733 Some(Bytes::from("test"))
734 );
735 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
736 }
737
738 #[ntex::test]
739 async fn test_unit() {
740 assert_eq!(().size(), BodySize::Empty);
741 assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
742 }
743
744 #[ntex::test]
745 async fn test_box() {
746 let mut val = Box::new(());
747 assert_eq!(val.size(), BodySize::Empty);
748 assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
749 }
750
751 #[ntex::test]
752 #[allow(clippy::eq_op)]
753 async fn test_body_eq() {
754 assert!(Body::None == Body::None);
755 assert!(Body::None != Body::Empty);
756 assert!(Body::Empty == Body::Empty);
757 assert!(Body::Empty != Body::None);
758 assert!(
759 Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
760 );
761 assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
762 }
763
764 #[ntex::test]
765 async fn test_body_debug() {
766 assert!(format!("{:?}", Body::None).contains("Body::None"));
767 assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
768 assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
769 }
770
771 #[ntex::test]
772 async fn body_stream() {
773 let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
774 assert!(format!("{st:?}").contains("BodyStream"));
775 let body: Body = st.into();
776 assert!(format!("{body:?}").contains("Body::Message(_)"));
777 assert!(body != Body::None);
778
779 let res = ResponseBody::new(body);
780 assert!(res.as_ref().is_some());
781 }
782
783 #[ntex::test]
784 async fn boxed_body_stream() {
785 let st = BoxedBodyStream::new(stream::once(Ready::<_, Rc<dyn Error>>::Ok(
786 Bytes::from("1"),
787 )));
788 assert!(format!("{st:?}").contains("BoxedBodyStream"));
789 let body: Body = st.into();
790 assert!(format!("{body:?}").contains("Body::Message(_)"));
791 assert!(body != Body::None);
792
793 let res = ResponseBody::new(body);
794 assert!(res.as_ref().is_some());
795 }
796
797 #[ntex::test]
798 async fn body_skips_empty_chunks() {
799 let mut body = BodyStream::new(stream::iter(
800 ["1", "", "2"]
801 .iter()
802 .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
803 ));
804 assert_eq!(
805 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
806 Some(Bytes::from("1")),
807 );
808 assert_eq!(
809 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
810 Some(Bytes::from("2")),
811 );
812 }
813
814 #[ntex::test]
815 async fn sized_skips_empty_chunks() {
816 let mut body = SizedStream::new(
817 2,
818 stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
819 );
820 assert!(format!("{body:?}").contains("SizedStream"));
821 assert_eq!(
822 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
823 Some(Bytes::from("1")),
824 );
825 assert_eq!(
826 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
827 Some(Bytes::from("2")),
828 );
829 }
830}