Skip to main content

ntex_http/
body.rs

1//! Traits and structures to aid consuming and writing HTTP payloads.
2use std::{error::Error, fmt, marker, mem, pin::Pin, rc::Rc, task::Context, task::Poll};
3
4use futures_core::Stream;
5use ntex_bytes::{BytePages, Bytes, BytesMut};
6
7#[derive(Debug, PartialEq, Eq, Copy, Clone)]
8/// Body size hint
9pub enum BodySize {
10    None,
11    Empty,
12    Sized(u64),
13    Stream,
14}
15
16impl BodySize {
17    pub fn is_eof(&self) -> bool {
18        matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
19    }
20}
21
22/// Interface for types that can be streamed to a peer.
23pub trait MessageBody: 'static {
24    /// Message body size hind
25    fn size(&self) -> BodySize;
26
27    fn poll_next_chunk(
28        &mut self,
29        cx: &mut Context<'_>,
30    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>>;
31}
32
33impl MessageBody for () {
34    #[inline]
35    fn size(&self) -> BodySize {
36        BodySize::Empty
37    }
38
39    #[inline]
40    fn poll_next_chunk(
41        &mut self,
42        _: &mut Context<'_>,
43    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
44        Poll::Ready(None)
45    }
46}
47
48impl<T: MessageBody> MessageBody for Box<T> {
49    #[inline]
50    fn size(&self) -> BodySize {
51        self.as_ref().size()
52    }
53
54    #[inline]
55    fn poll_next_chunk(
56        &mut self,
57        cx: &mut Context<'_>,
58    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
59        self.as_mut().poll_next_chunk(cx)
60    }
61}
62
63#[derive(Debug)]
64/// Represents http response body
65pub enum ResponseBody<B> {
66    Body(B),
67    Other(Body),
68}
69
70impl ResponseBody<Body> {
71    pub fn into_body<B>(self) -> ResponseBody<B> {
72        match self {
73            ResponseBody::Body(b) | ResponseBody::Other(b) => ResponseBody::Other(b),
74        }
75    }
76}
77
78impl From<ResponseBody<Body>> for Body {
79    fn from(b: ResponseBody<Body>) -> Self {
80        match b {
81            ResponseBody::Body(b) | ResponseBody::Other(b) => b,
82        }
83    }
84}
85
86impl<B> From<Body> for ResponseBody<B> {
87    fn from(b: Body) -> Self {
88        ResponseBody::Other(b)
89    }
90}
91
92impl<B> ResponseBody<B> {
93    #[inline]
94    pub fn new(body: B) -> Self {
95        ResponseBody::Body(body)
96    }
97
98    #[inline]
99    #[must_use]
100    pub fn take_body(&mut self) -> ResponseBody<B> {
101        std::mem::replace(self, ResponseBody::Other(Body::None))
102    }
103}
104
105impl<B: MessageBody> ResponseBody<B> {
106    pub fn as_ref(&self) -> Option<&B> {
107        if let ResponseBody::Body(b) = self {
108            Some(b)
109        } else {
110            None
111        }
112    }
113}
114
115impl<B: MessageBody> MessageBody for ResponseBody<B> {
116    #[inline]
117    fn size(&self) -> BodySize {
118        match self {
119            ResponseBody::Body(body) => body.size(),
120            ResponseBody::Other(body) => body.size(),
121        }
122    }
123
124    #[inline]
125    fn poll_next_chunk(
126        &mut self,
127        cx: &mut Context<'_>,
128    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
129        match self {
130            ResponseBody::Body(body) => body.poll_next_chunk(cx),
131            ResponseBody::Other(body) => body.poll_next_chunk(cx),
132        }
133    }
134}
135
136impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
137    type Item = Result<Bytes, Rc<dyn Error>>;
138
139    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140        match self.get_mut() {
141            ResponseBody::Body(body) => body.poll_next_chunk(cx),
142            ResponseBody::Other(body) => body.poll_next_chunk(cx),
143        }
144    }
145}
146
147/// Represents various types of http message body.
148pub enum Body {
149    /// Empty response. `Content-Length` header is not set.
150    None,
151    /// Zero sized response body. `Content-Length` header is set to `0`.
152    Empty,
153    /// Specific response body.
154    Bytes(Bytes),
155    /// Generic message body.
156    Message(Box<dyn MessageBody>),
157}
158
159impl Body {
160    /// Create body from slice (copy)
161    pub fn from_slice(s: &[u8]) -> Body {
162        Body::Bytes(Bytes::copy_from_slice(s))
163    }
164
165    /// Create body from generic message body.
166    pub fn from_message<B: MessageBody>(body: B) -> Body {
167        Body::Message(Box::new(body))
168    }
169}
170
171impl MessageBody for Body {
172    #[inline]
173    fn size(&self) -> BodySize {
174        match self {
175            Body::None => BodySize::None,
176            Body::Empty => BodySize::Empty,
177            Body::Bytes(bin) => BodySize::Sized(bin.len() as u64),
178            Body::Message(body) => body.size(),
179        }
180    }
181
182    fn poll_next_chunk(
183        &mut self,
184        cx: &mut Context<'_>,
185    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
186        match self {
187            Body::None | Body::Empty => Poll::Ready(None),
188            Body::Bytes(bin) => {
189                let len = bin.len();
190                if len == 0 {
191                    Poll::Ready(None)
192                } else {
193                    Poll::Ready(Some(Ok(mem::take(bin))))
194                }
195            }
196            Body::Message(body) => body.poll_next_chunk(cx),
197        }
198    }
199}
200
201impl PartialEq for Body {
202    fn eq(&self, other: &Body) -> bool {
203        match (self, other) {
204            (Body::None, Body::None) | (Body::Empty, Body::Empty) => true,
205            (Body::Bytes(b), Body::Bytes(b2)) => b == b2,
206            _ => false,
207        }
208    }
209}
210
211impl fmt::Debug for Body {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        match *self {
214            Body::None => write!(f, "Body::None"),
215            Body::Empty => write!(f, "Body::Empty"),
216            Body::Bytes(ref b) => write!(f, "Body::Bytes({b:?})"),
217            Body::Message(_) => write!(f, "Body::Message(_)"),
218        }
219    }
220}
221
222impl From<&'static str> for Body {
223    fn from(s: &'static str) -> Body {
224        Body::Bytes(Bytes::from_static(s.as_ref()))
225    }
226}
227
228impl From<&'static [u8]> for Body {
229    fn from(s: &'static [u8]) -> Body {
230        Body::Bytes(Bytes::from_static(s))
231    }
232}
233
234impl From<Vec<u8>> for Body {
235    fn from(vec: Vec<u8>) -> Body {
236        Body::Bytes(Bytes::from(vec))
237    }
238}
239
240impl From<String> for Body {
241    fn from(s: String) -> Body {
242        s.into_bytes().into()
243    }
244}
245
246impl<'a> From<&'a String> for Body {
247    fn from(s: &'a String) -> Body {
248        Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
249    }
250}
251
252impl From<Bytes> for Body {
253    fn from(s: Bytes) -> Body {
254        Body::Bytes(s)
255    }
256}
257
258impl From<BytesMut> for Body {
259    fn from(s: BytesMut) -> Body {
260        Body::Bytes(s.freeze())
261    }
262}
263
264impl From<BytePages> for Body {
265    fn from(pages: BytePages) -> Body {
266        Body::from_message(pages)
267    }
268}
269
270impl<S> From<SizedStream<S>> for Body
271where
272    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
273{
274    fn from(s: SizedStream<S>) -> Body {
275        Body::from_message(s)
276    }
277}
278
279impl<S, E> From<BodyStream<S, E>> for Body
280where
281    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
282    E: Error + 'static,
283{
284    fn from(s: BodyStream<S, E>) -> Body {
285        Body::from_message(s)
286    }
287}
288
289impl<S> From<BoxedBodyStream<S>> for Body
290where
291    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
292{
293    fn from(s: BoxedBodyStream<S>) -> Body {
294        Body::from_message(s)
295    }
296}
297
298impl MessageBody for Bytes {
299    fn size(&self) -> BodySize {
300        BodySize::Sized(self.len() as u64)
301    }
302
303    fn poll_next_chunk(
304        &mut self,
305        _: &mut Context<'_>,
306    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
307        if self.is_empty() {
308            Poll::Ready(None)
309        } else {
310            Poll::Ready(Some(Ok(mem::take(self))))
311        }
312    }
313}
314
315impl MessageBody for BytesMut {
316    fn size(&self) -> BodySize {
317        BodySize::Sized(self.len() as u64)
318    }
319
320    fn poll_next_chunk(
321        &mut self,
322        _: &mut Context<'_>,
323    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
324        if self.is_empty() {
325            Poll::Ready(None)
326        } else {
327            Poll::Ready(Some(Ok(mem::take(self).freeze())))
328        }
329    }
330}
331
332impl MessageBody for BytePages {
333    fn size(&self) -> BodySize {
334        BodySize::Sized(self.len() as u64)
335    }
336
337    fn poll_next_chunk(
338        &mut self,
339        _: &mut Context<'_>,
340    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
341        if let Some(page) = self.take() {
342            Poll::Ready(Some(Ok(page.freeze())))
343        } else {
344            Poll::Ready(None)
345        }
346    }
347}
348
349impl MessageBody for &'static str {
350    fn size(&self) -> BodySize {
351        BodySize::Sized(self.len() as u64)
352    }
353
354    fn poll_next_chunk(
355        &mut self,
356        _: &mut Context<'_>,
357    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
358        if self.is_empty() {
359            Poll::Ready(None)
360        } else {
361            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
362        }
363    }
364}
365
366impl MessageBody for &'static [u8] {
367    fn size(&self) -> BodySize {
368        BodySize::Sized(self.len() as u64)
369    }
370
371    fn poll_next_chunk(
372        &mut self,
373        _: &mut Context<'_>,
374    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
375        if self.is_empty() {
376            Poll::Ready(None)
377        } else {
378            Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
379        }
380    }
381}
382
383impl MessageBody for Vec<u8> {
384    fn size(&self) -> BodySize {
385        BodySize::Sized(self.len() as u64)
386    }
387
388    fn poll_next_chunk(
389        &mut self,
390        _: &mut Context<'_>,
391    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
392        if self.is_empty() {
393            Poll::Ready(None)
394        } else {
395            Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
396        }
397    }
398}
399
400impl MessageBody for String {
401    fn size(&self) -> BodySize {
402        BodySize::Sized(self.len() as u64)
403    }
404
405    fn poll_next_chunk(
406        &mut self,
407        _: &mut Context<'_>,
408    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
409        if self.is_empty() {
410            Poll::Ready(None)
411        } else {
412            Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
413        }
414    }
415}
416
417/// Type represent streaming body.
418///
419/// Response does not contain `content-length` header and appropriate transfer encoding is used.
420pub struct BodyStream<S, E> {
421    stream: S,
422    _t: marker::PhantomData<E>,
423}
424
425impl<S, E> BodyStream<S, E>
426where
427    S: Stream<Item = Result<Bytes, E>> + Unpin,
428    E: Error,
429{
430    pub fn new(stream: S) -> Self {
431        BodyStream {
432            stream,
433            _t: marker::PhantomData,
434        }
435    }
436}
437
438impl<S, E> fmt::Debug for BodyStream<S, E> {
439    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440        f.debug_struct("BodyStream")
441            .field("stream", &std::any::type_name::<S>())
442            .field("error", &std::any::type_name::<E>())
443            .finish()
444    }
445}
446
447impl<S, E> MessageBody for BodyStream<S, E>
448where
449    S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
450    E: Error + 'static,
451{
452    fn size(&self) -> BodySize {
453        BodySize::Stream
454    }
455
456    /// Attempts to pull out the next value of the underlying [`Stream`].
457    ///
458    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
459    /// ended on a zero-length chunk, but rather proceed until the underlying
460    /// [`Stream`] ends.
461    fn poll_next_chunk(
462        &mut self,
463        cx: &mut Context<'_>,
464    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
465        loop {
466            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
467                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
468                Poll::Ready(opt) => opt.map(|res| {
469                    res.map_err(|e| {
470                        let e: Rc<dyn Error> = Rc::new(e);
471                        e
472                    })
473                }),
474                Poll::Pending => return Poll::Pending,
475            });
476        }
477    }
478}
479
480/// Type represent streaming body.
481/// Response does not contain `content-length` header and appropriate transfer encoding is used.
482pub struct BoxedBodyStream<S> {
483    stream: S,
484}
485
486impl<S> BoxedBodyStream<S>
487where
488    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
489{
490    pub fn new(stream: S) -> Self {
491        BoxedBodyStream { stream }
492    }
493}
494
495impl<S> fmt::Debug for BoxedBodyStream<S> {
496    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
497        f.debug_struct("BoxedBodyStream")
498            .field("stream", &std::any::type_name::<S>())
499            .finish()
500    }
501}
502
503impl<S> MessageBody for BoxedBodyStream<S>
504where
505    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
506{
507    fn size(&self) -> BodySize {
508        BodySize::Stream
509    }
510
511    /// Attempts to pull out the next value of the underlying [`Stream`].
512    ///
513    /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
514    /// ended on a zero-length chunk, but rather proceed until the underlying
515    /// [`Stream`] ends.
516    fn poll_next_chunk(
517        &mut self,
518        cx: &mut Context<'_>,
519    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
520        loop {
521            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
522                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
523                Poll::Ready(opt) => opt,
524                Poll::Pending => return Poll::Pending,
525            });
526        }
527    }
528}
529
530/// Type represent streaming body. This body implementation should be used
531/// if total size of stream is known. Data get sent as is without using transfer encoding.
532pub struct SizedStream<S> {
533    size: u64,
534    stream: S,
535}
536
537impl<S> SizedStream<S>
538where
539    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
540{
541    pub fn new(size: u64, stream: S) -> Self {
542        SizedStream { size, stream }
543    }
544}
545
546impl<S> fmt::Debug for SizedStream<S> {
547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548        f.debug_struct("SizedStream")
549            .field("size", &self.size)
550            .field("stream", &std::any::type_name::<S>())
551            .finish()
552    }
553}
554
555impl<S> MessageBody for SizedStream<S>
556where
557    S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
558{
559    fn size(&self) -> BodySize {
560        BodySize::Sized(self.size)
561    }
562
563    /// Attempts to pull out the next value of the underlying [`Stream`].
564    ///
565    /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
566    /// ended on a zero-length chunk, but rather proceed until the underlying
567    /// [`Stream`] ends.
568    fn poll_next_chunk(
569        &mut self,
570        cx: &mut Context<'_>,
571    ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
572        loop {
573            return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
574                Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
575                Poll::Ready(val) => val,
576                Poll::Pending => return Poll::Pending,
577            });
578        }
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use std::{future::poll_fn, io};
585
586    use futures_util::stream;
587    use ntex::util::Ready;
588
589    use super::*;
590
591    impl Body {
592        pub(crate) fn get_ref(&self) -> &[u8] {
593            if let Body::Bytes(bin) = self { bin } else { panic!() }
594        }
595    }
596
597    #[ntex::test]
598    async fn test_size() {
599        assert_eq!(32, std::mem::size_of::<Body>());
600        assert_eq!(32, std::mem::size_of::<ResponseBody<Bytes>>());
601        assert_eq!(40, std::mem::size_of::<ResponseBody<Body>>());
602    }
603
604    #[ntex::test]
605    async fn test_static_str() {
606        assert_eq!(Body::from("").size(), BodySize::Sized(0));
607        assert_eq!(Body::from("test").size(), BodySize::Sized(4));
608        assert_eq!(Body::from("test").get_ref(), b"test");
609
610        assert_eq!("test".size(), BodySize::Sized(4));
611        assert_eq!(
612            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
613            Some(Bytes::from("test"))
614        );
615        assert_eq!(
616            poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
617            Some(Bytes::from("test"))
618        );
619        assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
620    }
621
622    #[ntex::test]
623    async fn test_pages() {
624        let mut pages = BytePages::default();
625        pages.append(Bytes::from("1111"));
626        pages.append(Bytes::from("2222"));
627        pages.append(Bytes::from("3333"));
628
629        let mut body = Body::from(pages);
630        assert_eq!(body.size(), BodySize::Sized(12));
631        assert_eq!(
632            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
633            Some(Bytes::from("111122223333"))
634        );
635        assert!(poll_fn(|cx| body.poll_next_chunk(cx)).await.is_none());
636    }
637
638    #[ntex::test]
639    async fn test_static_bytes() {
640        assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
641        assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
642        assert_eq!(
643            Body::from_slice(b"test".as_ref()).size(),
644            BodySize::Sized(4)
645        );
646        assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
647
648        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
649        assert_eq!(
650            poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
651                .await
652                .unwrap()
653                .ok(),
654            Some(Bytes::from("test"))
655        );
656        assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
657        assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
658    }
659
660    #[ntex::test]
661    async fn test_vec() {
662        assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
663        assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
664
665        assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
666        assert_eq!(
667            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
668                .await
669                .unwrap()
670                .ok(),
671            Some(Bytes::from("test"))
672        );
673        assert_eq!(
674            poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
675                .await
676                .unwrap()
677                .ok(),
678            Some(Bytes::from("test"))
679        );
680        assert!(
681            poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
682                .await
683                .is_none()
684        );
685    }
686
687    #[ntex::test]
688    async fn test_bytes() {
689        let mut b = Bytes::from("test");
690        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
691        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
692
693        assert_eq!(b.size(), BodySize::Sized(4));
694        assert_eq!(
695            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
696            Some(Bytes::from("test"))
697        );
698        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
699    }
700
701    #[ntex::test]
702    async fn test_bytes_mut() {
703        let mut b = Body::from(BytesMut::from("test"));
704        assert_eq!(b.size(), BodySize::Sized(4));
705        assert_eq!(b.get_ref(), b"test");
706        assert_eq!(
707            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
708            Some(Bytes::from("test"))
709        );
710        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
711
712        let mut b = BytesMut::from("test");
713        assert_eq!(b.size(), BodySize::Sized(4));
714        assert_eq!(
715            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
716            Some(Bytes::from("test"))
717        );
718        assert_eq!(b.size(), BodySize::Sized(0));
719        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
720    }
721
722    #[ntex::test]
723    async fn test_string() {
724        let mut b = "test".to_owned();
725        assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
726        assert_eq!(Body::from(b.clone()).get_ref(), b"test");
727        assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
728        assert_eq!(Body::from(&b).get_ref(), b"test");
729
730        assert_eq!(b.size(), BodySize::Sized(4));
731        assert_eq!(
732            poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
733            Some(Bytes::from("test"))
734        );
735        assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
736    }
737
738    #[ntex::test]
739    async fn test_unit() {
740        assert_eq!(().size(), BodySize::Empty);
741        assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
742    }
743
744    #[ntex::test]
745    async fn test_box() {
746        let mut val = Box::new(());
747        assert_eq!(val.size(), BodySize::Empty);
748        assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
749    }
750
751    #[ntex::test]
752    #[allow(clippy::eq_op)]
753    async fn test_body_eq() {
754        assert!(Body::None == Body::None);
755        assert!(Body::None != Body::Empty);
756        assert!(Body::Empty == Body::Empty);
757        assert!(Body::Empty != Body::None);
758        assert!(
759            Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
760        );
761        assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
762    }
763
764    #[ntex::test]
765    async fn test_body_debug() {
766        assert!(format!("{:?}", Body::None).contains("Body::None"));
767        assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
768        assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
769    }
770
771    #[ntex::test]
772    async fn body_stream() {
773        let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
774        assert!(format!("{st:?}").contains("BodyStream"));
775        let body: Body = st.into();
776        assert!(format!("{body:?}").contains("Body::Message(_)"));
777        assert!(body != Body::None);
778
779        let res = ResponseBody::new(body);
780        assert!(res.as_ref().is_some());
781    }
782
783    #[ntex::test]
784    async fn boxed_body_stream() {
785        let st = BoxedBodyStream::new(stream::once(Ready::<_, Rc<dyn Error>>::Ok(
786            Bytes::from("1"),
787        )));
788        assert!(format!("{st:?}").contains("BoxedBodyStream"));
789        let body: Body = st.into();
790        assert!(format!("{body:?}").contains("Body::Message(_)"));
791        assert!(body != Body::None);
792
793        let res = ResponseBody::new(body);
794        assert!(res.as_ref().is_some());
795    }
796
797    #[ntex::test]
798    async fn body_skips_empty_chunks() {
799        let mut body = BodyStream::new(stream::iter(
800            ["1", "", "2"]
801                .iter()
802                .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
803        ));
804        assert_eq!(
805            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
806            Some(Bytes::from("1")),
807        );
808        assert_eq!(
809            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
810            Some(Bytes::from("2")),
811        );
812    }
813
814    #[ntex::test]
815    async fn sized_skips_empty_chunks() {
816        let mut body = SizedStream::new(
817            2,
818            stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
819        );
820        assert!(format!("{body:?}").contains("SizedStream"));
821        assert_eq!(
822            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
823            Some(Bytes::from("1")),
824        );
825        assert_eq!(
826            poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
827            Some(Bytes::from("2")),
828        );
829    }
830}