ntex_http/
body.rs

1//! Traits and structures to aid consuming and writing HTTP payloads.
2use std::{
3    error::Error, fmt, marker::PhantomData, mem, pin::Pin, rc::Rc, task::Context,
4    task::Poll,
5};
6
7use futures_core::Stream;
8use ntex_bytes::{Bytes, BytesMut};
9
10#[derive(Debug, PartialEq, Eq, Copy, Clone)]
11/// Body size hint
12pub enum BodySize {
13    None,
14    Empty,
15    Sized(u64),
16    Stream,
17}
18
19impl BodySize {
20    pub fn is_eof(&self) -> bool {
21        matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
22    }
23}
24
25/// Interface for types that can be streamed to a peer.
26pub trait MessageBody: 'static {
27    /// Message body size hind
28    fn size(&self) -> BodySize;
29
30    fn poll_next_chunk(
31        &mut self,
32        cx: &mut Context<'_>,
33    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>>;
34}
35
36impl MessageBody for () {
37    #[inline]
38    fn size(&self) -> BodySize {
39        BodySize::Empty
40    }
41
42    #[inline]
43    fn poll_next_chunk(
44        &mut self,
45        _: &mut Context<'_>,
46    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
47        Poll::Ready(None)
48    }
49}
50
51impl<T: MessageBody> MessageBody for Box<T> {
52    #[inline]
53    fn size(&self) -> BodySize {
54        self.as_ref().size()
55    }
56
57    #[inline]
58    fn poll_next_chunk(
59        &mut self,
60        cx: &mut Context<'_>,
61    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
62        self.as_mut().poll_next_chunk(cx)
63    }
64}
65
66#[derive(Debug)]
67/// Represents http response body
68pub enum ResponseBody<B> {
69    Body(B),
70    Other(Body),
71}
72
73impl ResponseBody<Body> {
74    pub fn into_body<B>(self) -> ResponseBody<B> {
75        match self {
76            ResponseBody::Body(b) => ResponseBody::Other(b),
77            ResponseBody::Other(b) => ResponseBody::Other(b),
78        }
79    }
80}
81
82impl From<ResponseBody<Body>> for Body {
83    fn from(b: ResponseBody<Body>) -> Self {
84        match b {
85            ResponseBody::Body(b) => b,
86            ResponseBody::Other(b) => b,
87        }
88    }
89}
90
91impl<B> From<Body> for ResponseBody<B> {
92    fn from(b: Body) -> Self {
93        ResponseBody::Other(b)
94    }
95}
96
97impl<B> ResponseBody<B> {
98    #[inline]
99    pub fn new(body: B) -> Self {
100        ResponseBody::Body(body)
101    }
102
103    #[inline]
104    pub fn take_body(&mut self) -> ResponseBody<B> {
105        std::mem::replace(self, ResponseBody::Other(Body::None))
106    }
107}
108
109impl<B: MessageBody> ResponseBody<B> {
110    pub fn as_ref(&self) -> Option<&B> {
111        if let ResponseBody::Body(ref b) = self {
112            Some(b)
113        } else {
114            None
115        }
116    }
117}
118
119impl<B: MessageBody> MessageBody for ResponseBody<B> {
120    #[inline]
121    fn size(&self) -> BodySize {
122        match self {
123            ResponseBody::Body(ref body) => body.size(),
124            ResponseBody::Other(ref body) => body.size(),
125        }
126    }
127
128    #[inline]
129    fn poll_next_chunk(
130        &mut self,
131        cx: &mut Context<'_>,
132    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
133        match self {
134            ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
135            ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
136        }
137    }
138}
139
140impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
141    type Item = Result<Bytes, Rc<dyn Error>>;
142
143    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144        match self.get_mut() {
145            ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
146            ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
147        }
148    }
149}
150
151/// Represents various types of http message body.
152pub enum Body {
153    /// Empty response. `Content-Length` header is not set.
154    None,
155    /// Zero sized response body. `Content-Length` header is set to `0`.
156    Empty,
157    /// Specific response body.
158    Bytes(Bytes),
159    /// Generic message body.
160    Message(Box<dyn MessageBody>),
161}
162
163impl Body {
164    /// Create body from slice (copy)
165    pub fn from_slice(s: &[u8]) -> Body {
166        Body::Bytes(Bytes::copy_from_slice(s))
167    }
168
169    /// Create body from generic message body.
170    pub fn from_message<B: MessageBody>(body: B) -> Body {
171        Body::Message(Box::new(body))
172    }
173}
174
175impl MessageBody for Body {
176    #[inline]
177    fn size(&self) -> BodySize {
178        match self {
179            Body::None => BodySize::None,
180            Body::Empty => BodySize::Empty,
181            Body::Bytes(ref bin) => BodySize::Sized(bin.len() as u64),
182            Body::Message(ref body) => body.size(),
183        }
184    }
185
186    fn poll_next_chunk(
187        &mut self,
188        cx: &mut Context<'_>,
189    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
190        match self {
191            Body::None => Poll::Ready(None),
192            Body::Empty => Poll::Ready(None),
193            Body::Bytes(ref mut bin) => {
194                let len = bin.len();
195                if len == 0 {
196                    Poll::Ready(None)
197                } else {
198                    Poll::Ready(Some(Ok(mem::take(bin))))
199                }
200            }
201            Body::Message(ref mut body) => body.poll_next_chunk(cx),
202        }
203    }
204}
205
206impl PartialEq for Body {
207    fn eq(&self, other: &Body) -> bool {
208        match (self, other) {
209            (Body::None, Body::None) => true,
210            (Body::Empty, Body::Empty) => true,
211            (Body::Bytes(ref b), Body::Bytes(ref b2)) => b == b2,
212            _ => false,
213        }
214    }
215}
216
217impl fmt::Debug for Body {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        match *self {
220            Body::None => write!(f, "Body::None"),
221            Body::Empty => write!(f, "Body::Empty"),
222            Body::Bytes(ref b) => write!(f, "Body::Bytes({b:?})"),
223            Body::Message(_) => write!(f, "Body::Message(_)"),
224        }
225    }
226}
227
228impl From<&'static str> for Body {
229    fn from(s: &'static str) -> Body {
230        Body::Bytes(Bytes::from_static(s.as_ref()))
231    }
232}
233
234impl From<&'static [u8]> for Body {
235    fn from(s: &'static [u8]) -> Body {
236        Body::Bytes(Bytes::from_static(s))
237    }
238}
239
240impl From<Vec<u8>> for Body {
241    fn from(vec: Vec<u8>) -> Body {
242        Body::Bytes(Bytes::from(vec))
243    }
244}
245
246impl From<String> for Body {
247    fn from(s: String) -> Body {
248        s.into_bytes().into()
249    }
250}
251
252impl<'a> From<&'a String> for Body {
253    fn from(s: &'a String) -> Body {
254        Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
255    }
256}
257
258impl From<Bytes> for Body {
259    fn from(s: Bytes) -> Body {
260        Body::Bytes(s)
261    }
262}
263
264impl From<BytesMut> for Body {
265    fn from(s: BytesMut) -> Body {
266        Body::Bytes(s.freeze())
267    }
268}
269
270impl<S> From<SizedStream<S>> for Body
271where
272    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
273{
274    fn from(s: SizedStream<S>) -> Body {
275        Body::from_message(s)
276    }
277}
278
279impl<S, E> From<BodyStream<S, E>> for Body
280where
281    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
282    E: Error + 'static,
283{
284    fn from(s: BodyStream<S, E>) -> Body {
285        Body::from_message(s)
286    }
287}
288
289impl<S> From<BoxedBodyStream<S>> for Body
290where
291    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
292{
293    fn from(s: BoxedBodyStream<S>) -> Body {
294        Body::from_message(s)
295    }
296}
297
298impl MessageBody for Bytes {
299    fn size(&self) -> BodySize {
300        BodySize::Sized(self.len() as u64)
301    }
302
303    fn poll_next_chunk(
304        &mut self,
305        _: &mut Context<'_>,
306    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
307        if self.is_empty() {
308            Poll::Ready(None)
309        } else {
310            Poll::Ready(Some(Ok(mem::take(self))))
311        }
312    }
313}
314
315impl MessageBody for BytesMut {
316    fn size(&self) -> BodySize {
317        BodySize::Sized(self.len() as u64)
318    }
319
320    fn poll_next_chunk(
321        &mut self,
322        _: &mut Context<'_>,
323    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
324        if self.is_empty() {
325            Poll::Ready(None)
326        } else {
327            Poll::Ready(Some(Ok(mem::take(self).freeze())))
328        }
329    }
330}
331
332impl MessageBody for &'static str {
333    fn size(&self) -> BodySize {
334        BodySize::Sized(self.len() as u64)
335    }
336
337    fn poll_next_chunk(
338        &mut self,
339        _: &mut Context<'_>,
340    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
341        if self.is_empty() {
342            Poll::Ready(None)
343        } else {
344            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
345        }
346    }
347}
348
349impl MessageBody for &'static [u8] {
350    fn size(&self) -> BodySize {
351        BodySize::Sized(self.len() as u64)
352    }
353
354    fn poll_next_chunk(
355        &mut self,
356        _: &mut Context<'_>,
357    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
358        if self.is_empty() {
359            Poll::Ready(None)
360        } else {
361            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
362        }
363    }
364}
365
366impl MessageBody for Vec<u8> {
367    fn size(&self) -> BodySize {
368        BodySize::Sized(self.len() as u64)
369    }
370
371    fn poll_next_chunk(
372        &mut self,
373        _: &mut Context<'_>,
374    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
375        if self.is_empty() {
376            Poll::Ready(None)
377        } else {
378            Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
379        }
380    }
381}
382
383impl MessageBody for String {
384    fn size(&self) -> BodySize {
385        BodySize::Sized(self.len() as u64)
386    }
387
388    fn poll_next_chunk(
389        &mut self,
390        _: &mut Context<'_>,
391    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
392        if self.is_empty() {
393            Poll::Ready(None)
394        } else {
395            Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
396        }
397    }
398}
399
400/// Type represent streaming body.
401///
402/// Response does not contain `content-length` header and appropriate transfer encoding is used.
403pub struct BodyStream<S, E> {
404    stream: S,
405    _t: PhantomData<E>,
406}
407
408impl<S, E> BodyStream<S, E>
409where
410    S: Stream<Item = Result<Bytes, E>> + Unpin,
411    E: Error,
412{
413    pub fn new(stream: S) -> Self {
414        BodyStream {
415            stream,
416            _t: PhantomData,
417        }
418    }
419}
420
421impl<S, E> fmt::Debug for BodyStream<S, E> {
422    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423        f.debug_struct("BodyStream")
424            .field("stream", &std::any::type_name::<S>())
425            .field("error", &std::any::type_name::<E>())
426            .finish()
427    }
428}
429
430impl<S, E> MessageBody for BodyStream<S, E>
431where
432    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
433    E: Error + 'static,
434{
435    fn size(&self) -> BodySize {
436        BodySize::Stream
437    }
438
439    /// Attempts to pull out the next value of the underlying [`Stream`].
440    ///
441    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
442    /// ended on a zero-length chunk, but rather proceed until the underlying
443    /// [`Stream`] ends.
444    fn poll_next_chunk(
445        &mut self,
446        cx: &mut Context<'_>,
447    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
448        loop {
449            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
450                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
451                Poll::Ready(opt) => opt.map(|res| {
452                    res.map_err(|e| {
453                        let e: Rc<dyn Error> = Rc::new(e);
454                        e
455                    })
456                }),
457                Poll::Pending => return Poll::Pending,
458            });
459        }
460    }
461}
462
463/// Type represent streaming body.
464/// Response does not contain `content-length` header and appropriate transfer encoding is used.
465pub struct BoxedBodyStream<S> {
466    stream: S,
467}
468
469impl<S> BoxedBodyStream<S>
470where
471    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
472{
473    pub fn new(stream: S) -> Self {
474        BoxedBodyStream { stream }
475    }
476}
477
478impl<S> fmt::Debug for BoxedBodyStream<S> {
479    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
480        f.debug_struct("BoxedBodyStream")
481            .field("stream", &std::any::type_name::<S>())
482            .finish()
483    }
484}
485
486impl<S> MessageBody for BoxedBodyStream<S>
487where
488    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
489{
490    fn size(&self) -> BodySize {
491        BodySize::Stream
492    }
493
494    /// Attempts to pull out the next value of the underlying [`Stream`].
495    ///
496    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
497    /// ended on a zero-length chunk, but rather proceed until the underlying
498    /// [`Stream`] ends.
499    fn poll_next_chunk(
500        &mut self,
501        cx: &mut Context<'_>,
502    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
503        loop {
504            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
505                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
506                Poll::Ready(opt) => opt,
507                Poll::Pending => return Poll::Pending,
508            });
509        }
510    }
511}
512
513/// Type represent streaming body. This body implementation should be used
514/// if total size of stream is known. Data get sent as is without using transfer encoding.
515pub struct SizedStream<S> {
516    size: u64,
517    stream: S,
518}
519
520impl<S> SizedStream<S>
521where
522    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
523{
524    pub fn new(size: u64, stream: S) -> Self {
525        SizedStream { size, stream }
526    }
527}
528
529impl<S> fmt::Debug for SizedStream<S> {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        f.debug_struct("SizedStream")
532            .field("size", &self.size)
533            .field("stream", &std::any::type_name::<S>())
534            .finish()
535    }
536}
537
538impl<S> MessageBody for SizedStream<S>
539where
540    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
541{
542    fn size(&self) -> BodySize {
543        BodySize::Sized(self.size)
544    }
545
546    /// Attempts to pull out the next value of the underlying [`Stream`].
547    ///
548    /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
549    /// ended on a zero-length chunk, but rather proceed until the underlying
550    /// [`Stream`] ends.
551    fn poll_next_chunk(
552        &mut self,
553        cx: &mut Context<'_>,
554    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
555        loop {
556            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
557                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
558                Poll::Ready(val) => val,
559                Poll::Pending => return Poll::Pending,
560            });
561        }
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use std::{future::poll_fn, io};
568
569    use futures_util::stream;
570    use ntex_util::future::Ready;
571
572    use super::*;
573
574    impl Body {
575        pub(crate) fn get_ref(&self) -> &[u8] {
576            match *self {
577                Body::Bytes(ref bin) => bin,
578                _ => panic!(),
579            }
580        }
581    }
582
583    #[ntex::test]
584    async fn test_static_str() {
585        assert_eq!(Body::from("").size(), BodySize::Sized(0));
586        assert_eq!(Body::from("test").size(), BodySize::Sized(4));
587        assert_eq!(Body::from("test").get_ref(), b"test");
588
589        assert_eq!("test".size(), BodySize::Sized(4));
590        assert_eq!(
591            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
592            Some(Bytes::from("test"))
593        );
594        assert_eq!(
595            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
596            Some(Bytes::from("test"))
597        );
598        assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
599    }
600
601    #[ntex::test]
602    async fn test_static_bytes() {
603        assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
604        assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
605        assert_eq!(
606            Body::from_slice(b"test".as_ref()).size(),
607            BodySize::Sized(4)
608        );
609        assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
610
611        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
612        assert_eq!(
613            poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
614                .await
615                .unwrap()
616                .ok(),
617            Some(Bytes::from("test"))
618        );
619        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
620        assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
621    }
622
623    #[ntex::test]
624    async fn test_vec() {
625        assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
626        assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
627
628        assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
629        assert_eq!(
630            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
631                .await
632                .unwrap()
633                .ok(),
634            Some(Bytes::from("test"))
635        );
636        assert_eq!(
637            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
638                .await
639                .unwrap()
640                .ok(),
641            Some(Bytes::from("test"))
642        );
643        assert!(poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
644            .await
645            .is_none());
646    }
647
648    #[ntex::test]
649    async fn test_bytes() {
650        let mut b = Bytes::from("test");
651        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
652        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
653
654        assert_eq!(b.size(), BodySize::Sized(4));
655        assert_eq!(
656            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
657            Some(Bytes::from("test"))
658        );
659        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
660    }
661
662    #[ntex::test]
663    async fn test_bytes_mut() {
664        let mut b = Body::from(BytesMut::from("test"));
665        assert_eq!(b.size(), BodySize::Sized(4));
666        assert_eq!(b.get_ref(), b"test");
667        assert_eq!(
668            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
669            Some(Bytes::from("test"))
670        );
671        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
672
673        let mut b = BytesMut::from("test");
674        assert_eq!(b.size(), BodySize::Sized(4));
675        assert_eq!(
676            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
677            Some(Bytes::from("test"))
678        );
679        assert_eq!(b.size(), BodySize::Sized(0));
680        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
681    }
682
683    #[ntex::test]
684    async fn test_string() {
685        let mut b = "test".to_owned();
686        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
687        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
688        assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
689        assert_eq!(Body::from(&b).get_ref(), b"test");
690
691        assert_eq!(b.size(), BodySize::Sized(4));
692        assert_eq!(
693            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
694            Some(Bytes::from("test"))
695        );
696        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
697    }
698
699    #[ntex::test]
700    async fn test_unit() {
701        assert_eq!(().size(), BodySize::Empty);
702        assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
703    }
704
705    #[ntex::test]
706    async fn test_box() {
707        let mut val = Box::new(());
708        assert_eq!(val.size(), BodySize::Empty);
709        assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
710    }
711
712    #[ntex::test]
713    #[allow(clippy::eq_op)]
714    async fn test_body_eq() {
715        assert!(Body::None == Body::None);
716        assert!(Body::None != Body::Empty);
717        assert!(Body::Empty == Body::Empty);
718        assert!(Body::Empty != Body::None);
719        assert!(
720            Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
721        );
722        assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
723    }
724
725    #[ntex::test]
726    async fn test_body_debug() {
727        assert!(format!("{:?}", Body::None).contains("Body::None"));
728        assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
729        assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
730    }
731
732    #[ntex::test]
733    async fn body_stream() {
734        let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
735        assert!(format!("{st:?}").contains("BodyStream"));
736        let body: Body = st.into();
737        assert!(format!("{body:?}").contains("Body::Message(_)"));
738        assert!(body != Body::None);
739
740        let res = ResponseBody::new(body);
741        assert!(res.as_ref().is_some());
742    }
743
744    #[ntex::test]
745    async fn boxed_body_stream() {
746        let st = BoxedBodyStream::new(stream::once(Ready::<_, Rc<dyn Error>>::Ok(
747            Bytes::from("1"),
748        )));
749        assert!(format!("{st:?}").contains("BoxedBodyStream"));
750        let body: Body = st.into();
751        assert!(format!("{body:?}").contains("Body::Message(_)"));
752        assert!(body != Body::None);
753
754        let res = ResponseBody::new(body);
755        assert!(res.as_ref().is_some());
756    }
757
758    #[ntex::test]
759    async fn body_skips_empty_chunks() {
760        let mut body = BodyStream::new(stream::iter(
761            ["1", "", "2"]
762                .iter()
763                .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
764        ));
765        assert_eq!(
766            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
767            Some(Bytes::from("1")),
768        );
769        assert_eq!(
770            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
771            Some(Bytes::from("2")),
772        );
773    }
774
775    #[ntex::test]
776    async fn sized_skips_empty_chunks() {
777        let mut body = SizedStream::new(
778            2,
779            stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
780        );
781        assert!(format!("{body:?}").contains("SizedStream"));
782        assert_eq!(
783            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
784            Some(Bytes::from("1")),
785        );
786        assert_eq!(
787            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
788            Some(Bytes::from("2")),
789        );
790    }
791}