ntex_http/
body.rs

1//! Traits and structures to aid consuming and writing HTTP payloads.
2use std::{
3    error::Error, fmt, marker::PhantomData, mem, pin::Pin, task::Context, task::Poll,
4};
5
6use futures_core::Stream;
7use ntex_bytes::{Bytes, BytesMut};
8
9#[derive(Debug, PartialEq, Eq, Copy, Clone)]
10/// Body size hint
11pub enum BodySize {
12    None,
13    Empty,
14    Sized(u64),
15    Stream,
16}
17
18impl BodySize {
19    pub fn is_eof(&self) -> bool {
20        matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
21    }
22}
23
24/// Interface for types that can be streamed to a peer.
25pub trait MessageBody: 'static {
26    /// Message body size hind
27    fn size(&self) -> BodySize;
28
29    fn poll_next_chunk(
30        &mut self,
31        cx: &mut Context<'_>,
32    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>>;
33}
34
35impl MessageBody for () {
36    #[inline]
37    fn size(&self) -> BodySize {
38        BodySize::Empty
39    }
40
41    #[inline]
42    fn poll_next_chunk(
43        &mut self,
44        _: &mut Context<'_>,
45    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
46        Poll::Ready(None)
47    }
48}
49
50impl<T: MessageBody> MessageBody for Box<T> {
51    #[inline]
52    fn size(&self) -> BodySize {
53        self.as_ref().size()
54    }
55
56    #[inline]
57    fn poll_next_chunk(
58        &mut self,
59        cx: &mut Context<'_>,
60    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
61        self.as_mut().poll_next_chunk(cx)
62    }
63}
64
65#[derive(Debug)]
66/// Represents http response body
67pub enum ResponseBody<B> {
68    Body(B),
69    Other(Body),
70}
71
72impl ResponseBody<Body> {
73    pub fn into_body<B>(self) -> ResponseBody<B> {
74        match self {
75            ResponseBody::Body(b) => ResponseBody::Other(b),
76            ResponseBody::Other(b) => ResponseBody::Other(b),
77        }
78    }
79}
80
81impl From<ResponseBody<Body>> for Body {
82    fn from(b: ResponseBody<Body>) -> Self {
83        match b {
84            ResponseBody::Body(b) => b,
85            ResponseBody::Other(b) => b,
86        }
87    }
88}
89
90impl<B> From<Body> for ResponseBody<B> {
91    fn from(b: Body) -> Self {
92        ResponseBody::Other(b)
93    }
94}
95
96impl<B> ResponseBody<B> {
97    #[inline]
98    pub fn new(body: B) -> Self {
99        ResponseBody::Body(body)
100    }
101
102    #[inline]
103    pub fn take_body(&mut self) -> ResponseBody<B> {
104        std::mem::replace(self, ResponseBody::Other(Body::None))
105    }
106}
107
108impl<B: MessageBody> ResponseBody<B> {
109    pub fn as_ref(&self) -> Option<&B> {
110        if let ResponseBody::Body(ref b) = self {
111            Some(b)
112        } else {
113            None
114        }
115    }
116}
117
118impl<B: MessageBody> MessageBody for ResponseBody<B> {
119    #[inline]
120    fn size(&self) -> BodySize {
121        match self {
122            ResponseBody::Body(ref body) => body.size(),
123            ResponseBody::Other(ref body) => body.size(),
124        }
125    }
126
127    #[inline]
128    fn poll_next_chunk(
129        &mut self,
130        cx: &mut Context<'_>,
131    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
132        match self {
133            ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
134            ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
135        }
136    }
137}
138
139impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
140    type Item = Result<Bytes, Box<dyn Error>>;
141
142    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143        match self.get_mut() {
144            ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
145            ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
146        }
147    }
148}
149
150/// Represents various types of http message body.
151pub enum Body {
152    /// Empty response. `Content-Length` header is not set.
153    None,
154    /// Zero sized response body. `Content-Length` header is set to `0`.
155    Empty,
156    /// Specific response body.
157    Bytes(Bytes),
158    /// Generic message body.
159    Message(Box<dyn MessageBody>),
160}
161
162impl Body {
163    /// Create body from slice (copy)
164    pub fn from_slice(s: &[u8]) -> Body {
165        Body::Bytes(Bytes::copy_from_slice(s))
166    }
167
168    /// Create body from generic message body.
169    pub fn from_message<B: MessageBody>(body: B) -> Body {
170        Body::Message(Box::new(body))
171    }
172}
173
174impl MessageBody for Body {
175    #[inline]
176    fn size(&self) -> BodySize {
177        match self {
178            Body::None => BodySize::None,
179            Body::Empty => BodySize::Empty,
180            Body::Bytes(ref bin) => BodySize::Sized(bin.len() as u64),
181            Body::Message(ref body) => body.size(),
182        }
183    }
184
185    fn poll_next_chunk(
186        &mut self,
187        cx: &mut Context<'_>,
188    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
189        match self {
190            Body::None => Poll::Ready(None),
191            Body::Empty => Poll::Ready(None),
192            Body::Bytes(ref mut bin) => {
193                let len = bin.len();
194                if len == 0 {
195                    Poll::Ready(None)
196                } else {
197                    Poll::Ready(Some(Ok(mem::take(bin))))
198                }
199            }
200            Body::Message(ref mut body) => body.poll_next_chunk(cx),
201        }
202    }
203}
204
205impl PartialEq for Body {
206    fn eq(&self, other: &Body) -> bool {
207        match (self, other) {
208            (Body::None, Body::None) => true,
209            (Body::Empty, Body::Empty) => true,
210            (Body::Bytes(ref b), Body::Bytes(ref b2)) => b == b2,
211            _ => false,
212        }
213    }
214}
215
216impl fmt::Debug for Body {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        match *self {
219            Body::None => write!(f, "Body::None"),
220            Body::Empty => write!(f, "Body::Empty"),
221            Body::Bytes(ref b) => write!(f, "Body::Bytes({:?})", b),
222            Body::Message(_) => write!(f, "Body::Message(_)"),
223        }
224    }
225}
226
227impl From<&'static str> for Body {
228    fn from(s: &'static str) -> Body {
229        Body::Bytes(Bytes::from_static(s.as_ref()))
230    }
231}
232
233impl From<&'static [u8]> for Body {
234    fn from(s: &'static [u8]) -> Body {
235        Body::Bytes(Bytes::from_static(s))
236    }
237}
238
239impl From<Vec<u8>> for Body {
240    fn from(vec: Vec<u8>) -> Body {
241        Body::Bytes(Bytes::from(vec))
242    }
243}
244
245impl From<String> for Body {
246    fn from(s: String) -> Body {
247        s.into_bytes().into()
248    }
249}
250
251impl<'a> From<&'a String> for Body {
252    fn from(s: &'a String) -> Body {
253        Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
254    }
255}
256
257impl From<Bytes> for Body {
258    fn from(s: Bytes) -> Body {
259        Body::Bytes(s)
260    }
261}
262
263impl From<BytesMut> for Body {
264    fn from(s: BytesMut) -> Body {
265        Body::Bytes(s.freeze())
266    }
267}
268
269impl<S> From<SizedStream<S>> for Body
270where
271    S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
272{
273    fn from(s: SizedStream<S>) -> Body {
274        Body::from_message(s)
275    }
276}
277
278impl<S, E> From<BodyStream<S, E>> for Body
279where
280    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
281    E: Error + 'static,
282{
283    fn from(s: BodyStream<S, E>) -> Body {
284        Body::from_message(s)
285    }
286}
287
288impl<S> From<BoxedBodyStream<S>> for Body
289where
290    S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
291{
292    fn from(s: BoxedBodyStream<S>) -> Body {
293        Body::from_message(s)
294    }
295}
296
297impl MessageBody for Bytes {
298    fn size(&self) -> BodySize {
299        BodySize::Sized(self.len() as u64)
300    }
301
302    fn poll_next_chunk(
303        &mut self,
304        _: &mut Context<'_>,
305    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
306        if self.is_empty() {
307            Poll::Ready(None)
308        } else {
309            Poll::Ready(Some(Ok(mem::take(self))))
310        }
311    }
312}
313
314impl MessageBody for BytesMut {
315    fn size(&self) -> BodySize {
316        BodySize::Sized(self.len() as u64)
317    }
318
319    fn poll_next_chunk(
320        &mut self,
321        _: &mut Context<'_>,
322    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
323        if self.is_empty() {
324            Poll::Ready(None)
325        } else {
326            Poll::Ready(Some(Ok(mem::take(self).freeze())))
327        }
328    }
329}
330
331impl MessageBody for &'static str {
332    fn size(&self) -> BodySize {
333        BodySize::Sized(self.len() as u64)
334    }
335
336    fn poll_next_chunk(
337        &mut self,
338        _: &mut Context<'_>,
339    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
340        if self.is_empty() {
341            Poll::Ready(None)
342        } else {
343            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
344        }
345    }
346}
347
348impl MessageBody for &'static [u8] {
349    fn size(&self) -> BodySize {
350        BodySize::Sized(self.len() as u64)
351    }
352
353    fn poll_next_chunk(
354        &mut self,
355        _: &mut Context<'_>,
356    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
357        if self.is_empty() {
358            Poll::Ready(None)
359        } else {
360            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
361        }
362    }
363}
364
365impl MessageBody for Vec<u8> {
366    fn size(&self) -> BodySize {
367        BodySize::Sized(self.len() as u64)
368    }
369
370    fn poll_next_chunk(
371        &mut self,
372        _: &mut Context<'_>,
373    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
374        if self.is_empty() {
375            Poll::Ready(None)
376        } else {
377            Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
378        }
379    }
380}
381
382impl MessageBody for String {
383    fn size(&self) -> BodySize {
384        BodySize::Sized(self.len() as u64)
385    }
386
387    fn poll_next_chunk(
388        &mut self,
389        _: &mut Context<'_>,
390    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
391        if self.is_empty() {
392            Poll::Ready(None)
393        } else {
394            Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
395        }
396    }
397}
398
399/// Type represent streaming body.
400///
401/// Response does not contain `content-length` header and appropriate transfer encoding is used.
402pub struct BodyStream<S, E> {
403    stream: S,
404    _t: PhantomData<E>,
405}
406
407impl<S, E> BodyStream<S, E>
408where
409    S: Stream<Item = Result<Bytes, E>> + Unpin,
410    E: Error,
411{
412    pub fn new(stream: S) -> Self {
413        BodyStream {
414            stream,
415            _t: PhantomData,
416        }
417    }
418}
419
420impl<S, E> fmt::Debug for BodyStream<S, E> {
421    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422        f.debug_struct("BodyStream")
423            .field("stream", &std::any::type_name::<S>())
424            .field("error", &std::any::type_name::<E>())
425            .finish()
426    }
427}
428
429impl<S, E> MessageBody for BodyStream<S, E>
430where
431    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
432    E: Error + 'static,
433{
434    fn size(&self) -> BodySize {
435        BodySize::Stream
436    }
437
438    /// Attempts to pull out the next value of the underlying [`Stream`].
439    ///
440    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
441    /// ended on a zero-length chunk, but rather proceed until the underlying
442    /// [`Stream`] ends.
443    fn poll_next_chunk(
444        &mut self,
445        cx: &mut Context<'_>,
446    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
447        loop {
448            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
449                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
450                Poll::Ready(opt) => opt.map(|res| res.map_err(Into::into)),
451                Poll::Pending => return Poll::Pending,
452            });
453        }
454    }
455}
456
457/// Type represent streaming body.
458/// Response does not contain `content-length` header and appropriate transfer encoding is used.
459pub struct BoxedBodyStream<S> {
460    stream: S,
461}
462
463impl<S> BoxedBodyStream<S>
464where
465    S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin,
466{
467    pub fn new(stream: S) -> Self {
468        BoxedBodyStream { stream }
469    }
470}
471
472impl<S> fmt::Debug for BoxedBodyStream<S> {
473    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
474        f.debug_struct("BoxedBodyStream")
475            .field("stream", &std::any::type_name::<S>())
476            .finish()
477    }
478}
479
480impl<S> MessageBody for BoxedBodyStream<S>
481where
482    S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
483{
484    fn size(&self) -> BodySize {
485        BodySize::Stream
486    }
487
488    /// Attempts to pull out the next value of the underlying [`Stream`].
489    ///
490    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
491    /// ended on a zero-length chunk, but rather proceed until the underlying
492    /// [`Stream`] ends.
493    fn poll_next_chunk(
494        &mut self,
495        cx: &mut Context<'_>,
496    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
497        loop {
498            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
499                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
500                Poll::Ready(opt) => opt,
501                Poll::Pending => return Poll::Pending,
502            });
503        }
504    }
505}
506
507/// Type represent streaming body. This body implementation should be used
508/// if total size of stream is known. Data get sent as is without using transfer encoding.
509pub struct SizedStream<S> {
510    size: u64,
511    stream: S,
512}
513
514impl<S> SizedStream<S>
515where
516    S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin,
517{
518    pub fn new(size: u64, stream: S) -> Self {
519        SizedStream { size, stream }
520    }
521}
522
523impl<S> fmt::Debug for SizedStream<S> {
524    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525        f.debug_struct("SizedStream")
526            .field("size", &self.size)
527            .field("stream", &std::any::type_name::<S>())
528            .finish()
529    }
530}
531
532impl<S> MessageBody for SizedStream<S>
533where
534    S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
535{
536    fn size(&self) -> BodySize {
537        BodySize::Sized(self.size)
538    }
539
540    /// Attempts to pull out the next value of the underlying [`Stream`].
541    ///
542    /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
543    /// ended on a zero-length chunk, but rather proceed until the underlying
544    /// [`Stream`] ends.
545    fn poll_next_chunk(
546        &mut self,
547        cx: &mut Context<'_>,
548    ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
549        loop {
550            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
551                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
552                Poll::Ready(val) => val,
553                Poll::Pending => return Poll::Pending,
554            });
555        }
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use std::{future::poll_fn, io};
562
563    use futures_util::stream;
564    use ntex_util::future::Ready;
565
566    use super::*;
567
568    impl Body {
569        pub(crate) fn get_ref(&self) -> &[u8] {
570            match *self {
571                Body::Bytes(ref bin) => bin,
572                _ => panic!(),
573            }
574        }
575    }
576
577    #[ntex::test]
578    async fn test_static_str() {
579        assert_eq!(Body::from("").size(), BodySize::Sized(0));
580        assert_eq!(Body::from("test").size(), BodySize::Sized(4));
581        assert_eq!(Body::from("test").get_ref(), b"test");
582
583        assert_eq!("test".size(), BodySize::Sized(4));
584        assert_eq!(
585            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
586            Some(Bytes::from("test"))
587        );
588        assert_eq!(
589            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
590            Some(Bytes::from("test"))
591        );
592        assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
593    }
594
595    #[ntex::test]
596    async fn test_static_bytes() {
597        assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
598        assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
599        assert_eq!(
600            Body::from_slice(b"test".as_ref()).size(),
601            BodySize::Sized(4)
602        );
603        assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
604
605        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
606        assert_eq!(
607            poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
608                .await
609                .unwrap()
610                .ok(),
611            Some(Bytes::from("test"))
612        );
613        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
614        assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
615    }
616
617    #[ntex::test]
618    async fn test_vec() {
619        assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
620        assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
621
622        assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
623        assert_eq!(
624            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
625                .await
626                .unwrap()
627                .ok(),
628            Some(Bytes::from("test"))
629        );
630        assert_eq!(
631            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
632                .await
633                .unwrap()
634                .ok(),
635            Some(Bytes::from("test"))
636        );
637        assert!(poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
638            .await
639            .is_none());
640    }
641
642    #[ntex::test]
643    async fn test_bytes() {
644        let mut b = Bytes::from("test");
645        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
646        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
647
648        assert_eq!(b.size(), BodySize::Sized(4));
649        assert_eq!(
650            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
651            Some(Bytes::from("test"))
652        );
653        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
654    }
655
656    #[ntex::test]
657    async fn test_bytes_mut() {
658        let mut b = Body::from(BytesMut::from("test"));
659        assert_eq!(b.size(), BodySize::Sized(4));
660        assert_eq!(b.get_ref(), b"test");
661        assert_eq!(
662            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
663            Some(Bytes::from("test"))
664        );
665        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
666
667        let mut b = BytesMut::from("test");
668        assert_eq!(b.size(), BodySize::Sized(4));
669        assert_eq!(
670            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
671            Some(Bytes::from("test"))
672        );
673        assert_eq!(b.size(), BodySize::Sized(0));
674        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
675    }
676
677    #[ntex::test]
678    async fn test_string() {
679        let mut b = "test".to_owned();
680        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
681        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
682        assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
683        assert_eq!(Body::from(&b).get_ref(), b"test");
684
685        assert_eq!(b.size(), BodySize::Sized(4));
686        assert_eq!(
687            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
688            Some(Bytes::from("test"))
689        );
690        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
691    }
692
693    #[ntex::test]
694    async fn test_unit() {
695        assert_eq!(().size(), BodySize::Empty);
696        assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
697    }
698
699    #[ntex::test]
700    async fn test_box() {
701        let mut val = Box::new(());
702        assert_eq!(val.size(), BodySize::Empty);
703        assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
704    }
705
706    #[ntex::test]
707    #[allow(clippy::eq_op)]
708    async fn test_body_eq() {
709        assert!(Body::None == Body::None);
710        assert!(Body::None != Body::Empty);
711        assert!(Body::Empty == Body::Empty);
712        assert!(Body::Empty != Body::None);
713        assert!(
714            Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
715        );
716        assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
717    }
718
719    #[ntex::test]
720    async fn test_body_debug() {
721        assert!(format!("{:?}", Body::None).contains("Body::None"));
722        assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
723        assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
724    }
725
726    #[ntex::test]
727    async fn body_stream() {
728        let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
729        assert!(format!("{:?}", st).contains("BodyStream"));
730        let body: Body = st.into();
731        assert!(format!("{:?}", body).contains("Body::Message(_)"));
732        assert!(body != Body::None);
733
734        let res = ResponseBody::new(body);
735        assert!(res.as_ref().is_some());
736    }
737
738    #[ntex::test]
739    async fn boxed_body_stream() {
740        let st = BoxedBodyStream::new(stream::once(Ready::<_, Box<dyn Error>>::Ok(
741            Bytes::from("1"),
742        )));
743        assert!(format!("{:?}", st).contains("BoxedBodyStream"));
744        let body: Body = st.into();
745        assert!(format!("{:?}", body).contains("Body::Message(_)"));
746        assert!(body != Body::None);
747
748        let res = ResponseBody::new(body);
749        assert!(res.as_ref().is_some());
750    }
751
752    #[ntex::test]
753    async fn body_skips_empty_chunks() {
754        let mut body = BodyStream::new(stream::iter(
755            ["1", "", "2"]
756                .iter()
757                .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
758        ));
759        assert_eq!(
760            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
761            Some(Bytes::from("1")),
762        );
763        assert_eq!(
764            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
765            Some(Bytes::from("2")),
766        );
767    }
768
769    #[ntex::test]
770    async fn sized_skips_empty_chunks() {
771        let mut body = SizedStream::new(
772            2,
773            stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
774        );
775        assert!(format!("{:?}", body).contains("SizedStream"));
776        assert_eq!(
777            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
778            Some(Bytes::from("1")),
779        );
780        assert_eq!(
781            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
782            Some(Bytes::from("2")),
783        );
784    }
785}