Skip to main content

ntex_http/
body.rs

1//! Traits and structures to aid consuming and writing HTTP payloads.
2use std::{
3    error::Error, fmt, marker::PhantomData, mem, pin::Pin, rc::Rc, task::Context,
4    task::Poll,
5};
6
7use futures_core::Stream;
8use ntex_bytes::{Bytes, BytesMut};
9
10#[derive(Debug, PartialEq, Eq, Copy, Clone)]
11/// Body size hint
12pub enum BodySize {
13    None,
14    Empty,
15    Sized(u64),
16    Stream,
17}
18
19impl BodySize {
20    pub fn is_eof(&self) -> bool {
21        matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
22    }
23}
24
25/// Interface for types that can be streamed to a peer.
26pub trait MessageBody: 'static {
27    /// Message body size hind
28    fn size(&self) -> BodySize;
29
30    fn poll_next_chunk(
31        &mut self,
32        cx: &mut Context<'_>,
33    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>>;
34}
35
36impl MessageBody for () {
37    #[inline]
38    fn size(&self) -> BodySize {
39        BodySize::Empty
40    }
41
42    #[inline]
43    fn poll_next_chunk(
44        &mut self,
45        _: &mut Context<'_>,
46    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
47        Poll::Ready(None)
48    }
49}
50
51impl<T: MessageBody> MessageBody for Box<T> {
52    #[inline]
53    fn size(&self) -> BodySize {
54        self.as_ref().size()
55    }
56
57    #[inline]
58    fn poll_next_chunk(
59        &mut self,
60        cx: &mut Context<'_>,
61    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
62        self.as_mut().poll_next_chunk(cx)
63    }
64}
65
66#[derive(Debug)]
67/// Represents http response body
68pub enum ResponseBody<B> {
69    Body(B),
70    Other(Body),
71}
72
73impl ResponseBody<Body> {
74    pub fn into_body<B>(self) -> ResponseBody<B> {
75        match self {
76            ResponseBody::Body(b) | ResponseBody::Other(b) => ResponseBody::Other(b),
77        }
78    }
79}
80
81impl From<ResponseBody<Body>> for Body {
82    fn from(b: ResponseBody<Body>) -> Self {
83        match b {
84            ResponseBody::Body(b) | ResponseBody::Other(b) => b,
85        }
86    }
87}
88
89impl<B> From<Body> for ResponseBody<B> {
90    fn from(b: Body) -> Self {
91        ResponseBody::Other(b)
92    }
93}
94
95impl<B> ResponseBody<B> {
96    #[inline]
97    pub fn new(body: B) -> Self {
98        ResponseBody::Body(body)
99    }
100
101    #[inline]
102    #[must_use]
103    pub fn take_body(&mut self) -> ResponseBody<B> {
104        std::mem::replace(self, ResponseBody::Other(Body::None))
105    }
106}
107
108impl<B: MessageBody> ResponseBody<B> {
109    pub fn as_ref(&self) -> Option<&B> {
110        if let ResponseBody::Body(b) = self {
111            Some(b)
112        } else {
113            None
114        }
115    }
116}
117
118impl<B: MessageBody> MessageBody for ResponseBody<B> {
119    #[inline]
120    fn size(&self) -> BodySize {
121        match self {
122            ResponseBody::Body(body) => body.size(),
123            ResponseBody::Other(body) => body.size(),
124        }
125    }
126
127    #[inline]
128    fn poll_next_chunk(
129        &mut self,
130        cx: &mut Context<'_>,
131    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
132        match self {
133            ResponseBody::Body(body) => body.poll_next_chunk(cx),
134            ResponseBody::Other(body) => body.poll_next_chunk(cx),
135        }
136    }
137}
138
139impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
140    type Item = Result<Bytes, Rc<dyn Error>>;
141
142    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143        match self.get_mut() {
144            ResponseBody::Body(body) => body.poll_next_chunk(cx),
145            ResponseBody::Other(body) => body.poll_next_chunk(cx),
146        }
147    }
148}
149
150/// Represents various types of http message body.
151pub enum Body {
152    /// Empty response. `Content-Length` header is not set.
153    None,
154    /// Zero sized response body. `Content-Length` header is set to `0`.
155    Empty,
156    /// Specific response body.
157    Bytes(Bytes),
158    /// Generic message body.
159    Message(Box<dyn MessageBody>),
160}
161
162impl Body {
163    /// Create body from slice (copy)
164    pub fn from_slice(s: &[u8]) -> Body {
165        Body::Bytes(Bytes::copy_from_slice(s))
166    }
167
168    /// Create body from generic message body.
169    pub fn from_message<B: MessageBody>(body: B) -> Body {
170        Body::Message(Box::new(body))
171    }
172}
173
174impl MessageBody for Body {
175    #[inline]
176    fn size(&self) -> BodySize {
177        match self {
178            Body::None => BodySize::None,
179            Body::Empty => BodySize::Empty,
180            Body::Bytes(bin) => BodySize::Sized(bin.len() as u64),
181            Body::Message(body) => body.size(),
182        }
183    }
184
185    fn poll_next_chunk(
186        &mut self,
187        cx: &mut Context<'_>,
188    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
189        match self {
190            Body::None | Body::Empty => Poll::Ready(None),
191            Body::Bytes(bin) => {
192                let len = bin.len();
193                if len == 0 {
194                    Poll::Ready(None)
195                } else {
196                    Poll::Ready(Some(Ok(mem::take(bin))))
197                }
198            }
199            Body::Message(body) => body.poll_next_chunk(cx),
200        }
201    }
202}
203
204impl PartialEq for Body {
205    fn eq(&self, other: &Body) -> bool {
206        match (self, other) {
207            (Body::None, Body::None) | (Body::Empty, Body::Empty) => true,
208            (Body::Bytes(b), Body::Bytes(b2)) => b == b2,
209            _ => false,
210        }
211    }
212}
213
214impl fmt::Debug for Body {
215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216        match *self {
217            Body::None => write!(f, "Body::None"),
218            Body::Empty => write!(f, "Body::Empty"),
219            Body::Bytes(ref b) => write!(f, "Body::Bytes({b:?})"),
220            Body::Message(_) => write!(f, "Body::Message(_)"),
221        }
222    }
223}
224
225impl From<&'static str> for Body {
226    fn from(s: &'static str) -> Body {
227        Body::Bytes(Bytes::from_static(s.as_ref()))
228    }
229}
230
231impl From<&'static [u8]> for Body {
232    fn from(s: &'static [u8]) -> Body {
233        Body::Bytes(Bytes::from_static(s))
234    }
235}
236
237impl From<Vec<u8>> for Body {
238    fn from(vec: Vec<u8>) -> Body {
239        Body::Bytes(Bytes::from(vec))
240    }
241}
242
243impl From<String> for Body {
244    fn from(s: String) -> Body {
245        s.into_bytes().into()
246    }
247}
248
249impl<'a> From<&'a String> for Body {
250    fn from(s: &'a String) -> Body {
251        Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
252    }
253}
254
255impl From<Bytes> for Body {
256    fn from(s: Bytes) -> Body {
257        Body::Bytes(s)
258    }
259}
260
261impl From<BytesMut> for Body {
262    fn from(s: BytesMut) -> Body {
263        Body::Bytes(s.freeze())
264    }
265}
266
267impl<S> From<SizedStream<S>> for Body
268where
269    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
270{
271    fn from(s: SizedStream<S>) -> Body {
272        Body::from_message(s)
273    }
274}
275
276impl<S, E> From<BodyStream<S, E>> for Body
277where
278    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
279    E: Error + 'static,
280{
281    fn from(s: BodyStream<S, E>) -> Body {
282        Body::from_message(s)
283    }
284}
285
286impl<S> From<BoxedBodyStream<S>> for Body
287where
288    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
289{
290    fn from(s: BoxedBodyStream<S>) -> Body {
291        Body::from_message(s)
292    }
293}
294
295impl MessageBody for Bytes {
296    fn size(&self) -> BodySize {
297        BodySize::Sized(self.len() as u64)
298    }
299
300    fn poll_next_chunk(
301        &mut self,
302        _: &mut Context<'_>,
303    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
304        if self.is_empty() {
305            Poll::Ready(None)
306        } else {
307            Poll::Ready(Some(Ok(mem::take(self))))
308        }
309    }
310}
311
312impl MessageBody for BytesMut {
313    fn size(&self) -> BodySize {
314        BodySize::Sized(self.len() as u64)
315    }
316
317    fn poll_next_chunk(
318        &mut self,
319        _: &mut Context<'_>,
320    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
321        if self.is_empty() {
322            Poll::Ready(None)
323        } else {
324            Poll::Ready(Some(Ok(mem::take(self).freeze())))
325        }
326    }
327}
328
329impl MessageBody for &'static str {
330    fn size(&self) -> BodySize {
331        BodySize::Sized(self.len() as u64)
332    }
333
334    fn poll_next_chunk(
335        &mut self,
336        _: &mut Context<'_>,
337    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
338        if self.is_empty() {
339            Poll::Ready(None)
340        } else {
341            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
342        }
343    }
344}
345
346impl MessageBody for &'static [u8] {
347    fn size(&self) -> BodySize {
348        BodySize::Sized(self.len() as u64)
349    }
350
351    fn poll_next_chunk(
352        &mut self,
353        _: &mut Context<'_>,
354    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
355        if self.is_empty() {
356            Poll::Ready(None)
357        } else {
358            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
359        }
360    }
361}
362
363impl MessageBody for Vec<u8> {
364    fn size(&self) -> BodySize {
365        BodySize::Sized(self.len() as u64)
366    }
367
368    fn poll_next_chunk(
369        &mut self,
370        _: &mut Context<'_>,
371    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
372        if self.is_empty() {
373            Poll::Ready(None)
374        } else {
375            Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
376        }
377    }
378}
379
380impl MessageBody for String {
381    fn size(&self) -> BodySize {
382        BodySize::Sized(self.len() as u64)
383    }
384
385    fn poll_next_chunk(
386        &mut self,
387        _: &mut Context<'_>,
388    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
389        if self.is_empty() {
390            Poll::Ready(None)
391        } else {
392            Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
393        }
394    }
395}
396
397/// Type represent streaming body.
398///
399/// Response does not contain `content-length` header and appropriate transfer encoding is used.
400pub struct BodyStream<S, E> {
401    stream: S,
402    _t: PhantomData<E>,
403}
404
405impl<S, E> BodyStream<S, E>
406where
407    S: Stream<Item = Result<Bytes, E>> + Unpin,
408    E: Error,
409{
410    pub fn new(stream: S) -> Self {
411        BodyStream {
412            stream,
413            _t: PhantomData,
414        }
415    }
416}
417
418impl<S, E> fmt::Debug for BodyStream<S, E> {
419    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420        f.debug_struct("BodyStream")
421            .field("stream", &std::any::type_name::<S>())
422            .field("error", &std::any::type_name::<E>())
423            .finish()
424    }
425}
426
427impl<S, E> MessageBody for BodyStream<S, E>
428where
429    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
430    E: Error + 'static,
431{
432    fn size(&self) -> BodySize {
433        BodySize::Stream
434    }
435
436    /// Attempts to pull out the next value of the underlying [`Stream`].
437    ///
438    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
439    /// ended on a zero-length chunk, but rather proceed until the underlying
440    /// [`Stream`] ends.
441    fn poll_next_chunk(
442        &mut self,
443        cx: &mut Context<'_>,
444    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
445        loop {
446            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
447                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
448                Poll::Ready(opt) => opt.map(|res| {
449                    res.map_err(|e| {
450                        let e: Rc<dyn Error> = Rc::new(e);
451                        e
452                    })
453                }),
454                Poll::Pending => return Poll::Pending,
455            });
456        }
457    }
458}
459
460/// Type represent streaming body.
461/// Response does not contain `content-length` header and appropriate transfer encoding is used.
462pub struct BoxedBodyStream<S> {
463    stream: S,
464}
465
466impl<S> BoxedBodyStream<S>
467where
468    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
469{
470    pub fn new(stream: S) -> Self {
471        BoxedBodyStream { stream }
472    }
473}
474
475impl<S> fmt::Debug for BoxedBodyStream<S> {
476    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
477        f.debug_struct("BoxedBodyStream")
478            .field("stream", &std::any::type_name::<S>())
479            .finish()
480    }
481}
482
483impl<S> MessageBody for BoxedBodyStream<S>
484where
485    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
486{
487    fn size(&self) -> BodySize {
488        BodySize::Stream
489    }
490
491    /// Attempts to pull out the next value of the underlying [`Stream`].
492    ///
493    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
494    /// ended on a zero-length chunk, but rather proceed until the underlying
495    /// [`Stream`] ends.
496    fn poll_next_chunk(
497        &mut self,
498        cx: &mut Context<'_>,
499    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
500        loop {
501            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
502                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
503                Poll::Ready(opt) => opt,
504                Poll::Pending => return Poll::Pending,
505            });
506        }
507    }
508}
509
510/// Type represent streaming body. This body implementation should be used
511/// if total size of stream is known. Data get sent as is without using transfer encoding.
512pub struct SizedStream<S> {
513    size: u64,
514    stream: S,
515}
516
517impl<S> SizedStream<S>
518where
519    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
520{
521    pub fn new(size: u64, stream: S) -> Self {
522        SizedStream { size, stream }
523    }
524}
525
526impl<S> fmt::Debug for SizedStream<S> {
527    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528        f.debug_struct("SizedStream")
529            .field("size", &self.size)
530            .field("stream", &std::any::type_name::<S>())
531            .finish()
532    }
533}
534
535impl<S> MessageBody for SizedStream<S>
536where
537    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
538{
539    fn size(&self) -> BodySize {
540        BodySize::Sized(self.size)
541    }
542
543    /// Attempts to pull out the next value of the underlying [`Stream`].
544    ///
545    /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
546    /// ended on a zero-length chunk, but rather proceed until the underlying
547    /// [`Stream`] ends.
548    fn poll_next_chunk(
549        &mut self,
550        cx: &mut Context<'_>,
551    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
552        loop {
553            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
554                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
555                Poll::Ready(val) => val,
556                Poll::Pending => return Poll::Pending,
557            });
558        }
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use std::{future::poll_fn, io};
565
566    use futures_util::stream;
567    use ntex::util::Ready;
568
569    use super::*;
570
571    impl Body {
572        pub(crate) fn get_ref(&self) -> &[u8] {
573            if let Body::Bytes(bin) = self { bin } else { panic!() }
574        }
575    }
576
577    #[ntex::test]
578    async fn test_static_str() {
579        assert_eq!(Body::from("").size(), BodySize::Sized(0));
580        assert_eq!(Body::from("test").size(), BodySize::Sized(4));
581        assert_eq!(Body::from("test").get_ref(), b"test");
582
583        assert_eq!("test".size(), BodySize::Sized(4));
584        assert_eq!(
585            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
586            Some(Bytes::from("test"))
587        );
588        assert_eq!(
589            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
590            Some(Bytes::from("test"))
591        );
592        assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
593    }
594
595    #[ntex::test]
596    async fn test_static_bytes() {
597        assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
598        assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
599        assert_eq!(
600            Body::from_slice(b"test".as_ref()).size(),
601            BodySize::Sized(4)
602        );
603        assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
604
605        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
606        assert_eq!(
607            poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
608                .await
609                .unwrap()
610                .ok(),
611            Some(Bytes::from("test"))
612        );
613        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
614        assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
615    }
616
617    #[ntex::test]
618    async fn test_vec() {
619        assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
620        assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
621
622        assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
623        assert_eq!(
624            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
625                .await
626                .unwrap()
627                .ok(),
628            Some(Bytes::from("test"))
629        );
630        assert_eq!(
631            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
632                .await
633                .unwrap()
634                .ok(),
635            Some(Bytes::from("test"))
636        );
637        assert!(
638            poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
639                .await
640                .is_none()
641        );
642    }
643
644    #[ntex::test]
645    async fn test_bytes() {
646        let mut b = Bytes::from("test");
647        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
648        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
649
650        assert_eq!(b.size(), BodySize::Sized(4));
651        assert_eq!(
652            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
653            Some(Bytes::from("test"))
654        );
655        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
656    }
657
658    #[ntex::test]
659    async fn test_bytes_mut() {
660        let mut b = Body::from(BytesMut::from("test"));
661        assert_eq!(b.size(), BodySize::Sized(4));
662        assert_eq!(b.get_ref(), b"test");
663        assert_eq!(
664            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
665            Some(Bytes::from("test"))
666        );
667        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
668
669        let mut b = BytesMut::from("test");
670        assert_eq!(b.size(), BodySize::Sized(4));
671        assert_eq!(
672            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
673            Some(Bytes::from("test"))
674        );
675        assert_eq!(b.size(), BodySize::Sized(0));
676        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
677    }
678
679    #[ntex::test]
680    async fn test_string() {
681        let mut b = "test".to_owned();
682        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
683        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
684        assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
685        assert_eq!(Body::from(&b).get_ref(), b"test");
686
687        assert_eq!(b.size(), BodySize::Sized(4));
688        assert_eq!(
689            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
690            Some(Bytes::from("test"))
691        );
692        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
693    }
694
695    #[ntex::test]
696    async fn test_unit() {
697        assert_eq!(().size(), BodySize::Empty);
698        assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
699    }
700
701    #[ntex::test]
702    async fn test_box() {
703        let mut val = Box::new(());
704        assert_eq!(val.size(), BodySize::Empty);
705        assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
706    }
707
708    #[ntex::test]
709    #[allow(clippy::eq_op)]
710    async fn test_body_eq() {
711        assert!(Body::None == Body::None);
712        assert!(Body::None != Body::Empty);
713        assert!(Body::Empty == Body::Empty);
714        assert!(Body::Empty != Body::None);
715        assert!(
716            Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
717        );
718        assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
719    }
720
721    #[ntex::test]
722    async fn test_body_debug() {
723        assert!(format!("{:?}", Body::None).contains("Body::None"));
724        assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
725        assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
726    }
727
728    #[ntex::test]
729    async fn body_stream() {
730        let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
731        assert!(format!("{st:?}").contains("BodyStream"));
732        let body: Body = st.into();
733        assert!(format!("{body:?}").contains("Body::Message(_)"));
734        assert!(body != Body::None);
735
736        let res = ResponseBody::new(body);
737        assert!(res.as_ref().is_some());
738    }
739
740    #[ntex::test]
741    async fn boxed_body_stream() {
742        let st = BoxedBodyStream::new(stream::once(Ready::<_, Rc<dyn Error>>::Ok(
743            Bytes::from("1"),
744        )));
745        assert!(format!("{st:?}").contains("BoxedBodyStream"));
746        let body: Body = st.into();
747        assert!(format!("{body:?}").contains("Body::Message(_)"));
748        assert!(body != Body::None);
749
750        let res = ResponseBody::new(body);
751        assert!(res.as_ref().is_some());
752    }
753
754    #[ntex::test]
755    async fn body_skips_empty_chunks() {
756        let mut body = BodyStream::new(stream::iter(
757            ["1", "", "2"]
758                .iter()
759                .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
760        ));
761        assert_eq!(
762            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
763            Some(Bytes::from("1")),
764        );
765        assert_eq!(
766            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
767            Some(Bytes::from("2")),
768        );
769    }
770
771    #[ntex::test]
772    async fn sized_skips_empty_chunks() {
773        let mut body = SizedStream::new(
774            2,
775            stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
776        );
777        assert!(format!("{body:?}").contains("SizedStream"));
778        assert_eq!(
779            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
780            Some(Bytes::from("1")),
781        );
782        assert_eq!(
783            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
784            Some(Bytes::from("2")),
785        );
786    }
787}