1use std::{
3 error::Error, fmt, marker::PhantomData, mem, pin::Pin, task::Context, task::Poll,
4};
5
6use futures_core::Stream;
7use ntex_bytes::{Bytes, BytesMut};
8
9#[derive(Debug, PartialEq, Eq, Copy, Clone)]
10pub enum BodySize {
12 None,
13 Empty,
14 Sized(u64),
15 Stream,
16}
17
18impl BodySize {
19 pub fn is_eof(&self) -> bool {
20 matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
21 }
22}
23
24pub trait MessageBody: 'static {
26 fn size(&self) -> BodySize;
28
29 fn poll_next_chunk(
30 &mut self,
31 cx: &mut Context<'_>,
32 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>>;
33}
34
35impl MessageBody for () {
36 #[inline]
37 fn size(&self) -> BodySize {
38 BodySize::Empty
39 }
40
41 #[inline]
42 fn poll_next_chunk(
43 &mut self,
44 _: &mut Context<'_>,
45 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
46 Poll::Ready(None)
47 }
48}
49
50impl<T: MessageBody> MessageBody for Box<T> {
51 #[inline]
52 fn size(&self) -> BodySize {
53 self.as_ref().size()
54 }
55
56 #[inline]
57 fn poll_next_chunk(
58 &mut self,
59 cx: &mut Context<'_>,
60 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
61 self.as_mut().poll_next_chunk(cx)
62 }
63}
64
65#[derive(Debug)]
66pub enum ResponseBody<B> {
68 Body(B),
69 Other(Body),
70}
71
72impl ResponseBody<Body> {
73 pub fn into_body<B>(self) -> ResponseBody<B> {
74 match self {
75 ResponseBody::Body(b) => ResponseBody::Other(b),
76 ResponseBody::Other(b) => ResponseBody::Other(b),
77 }
78 }
79}
80
81impl From<ResponseBody<Body>> for Body {
82 fn from(b: ResponseBody<Body>) -> Self {
83 match b {
84 ResponseBody::Body(b) => b,
85 ResponseBody::Other(b) => b,
86 }
87 }
88}
89
90impl<B> From<Body> for ResponseBody<B> {
91 fn from(b: Body) -> Self {
92 ResponseBody::Other(b)
93 }
94}
95
96impl<B> ResponseBody<B> {
97 #[inline]
98 pub fn new(body: B) -> Self {
99 ResponseBody::Body(body)
100 }
101
102 #[inline]
103 pub fn take_body(&mut self) -> ResponseBody<B> {
104 std::mem::replace(self, ResponseBody::Other(Body::None))
105 }
106}
107
108impl<B: MessageBody> ResponseBody<B> {
109 pub fn as_ref(&self) -> Option<&B> {
110 if let ResponseBody::Body(ref b) = self {
111 Some(b)
112 } else {
113 None
114 }
115 }
116}
117
118impl<B: MessageBody> MessageBody for ResponseBody<B> {
119 #[inline]
120 fn size(&self) -> BodySize {
121 match self {
122 ResponseBody::Body(ref body) => body.size(),
123 ResponseBody::Other(ref body) => body.size(),
124 }
125 }
126
127 #[inline]
128 fn poll_next_chunk(
129 &mut self,
130 cx: &mut Context<'_>,
131 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
132 match self {
133 ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
134 ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
135 }
136 }
137}
138
139impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
140 type Item = Result<Bytes, Box<dyn Error>>;
141
142 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143 match self.get_mut() {
144 ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
145 ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
146 }
147 }
148}
149
150pub enum Body {
152 None,
154 Empty,
156 Bytes(Bytes),
158 Message(Box<dyn MessageBody>),
160}
161
162impl Body {
163 pub fn from_slice(s: &[u8]) -> Body {
165 Body::Bytes(Bytes::copy_from_slice(s))
166 }
167
168 pub fn from_message<B: MessageBody>(body: B) -> Body {
170 Body::Message(Box::new(body))
171 }
172}
173
174impl MessageBody for Body {
175 #[inline]
176 fn size(&self) -> BodySize {
177 match self {
178 Body::None => BodySize::None,
179 Body::Empty => BodySize::Empty,
180 Body::Bytes(ref bin) => BodySize::Sized(bin.len() as u64),
181 Body::Message(ref body) => body.size(),
182 }
183 }
184
185 fn poll_next_chunk(
186 &mut self,
187 cx: &mut Context<'_>,
188 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
189 match self {
190 Body::None => Poll::Ready(None),
191 Body::Empty => Poll::Ready(None),
192 Body::Bytes(ref mut bin) => {
193 let len = bin.len();
194 if len == 0 {
195 Poll::Ready(None)
196 } else {
197 Poll::Ready(Some(Ok(mem::take(bin))))
198 }
199 }
200 Body::Message(ref mut body) => body.poll_next_chunk(cx),
201 }
202 }
203}
204
205impl PartialEq for Body {
206 fn eq(&self, other: &Body) -> bool {
207 match (self, other) {
208 (Body::None, Body::None) => true,
209 (Body::Empty, Body::Empty) => true,
210 (Body::Bytes(ref b), Body::Bytes(ref b2)) => b == b2,
211 _ => false,
212 }
213 }
214}
215
216impl fmt::Debug for Body {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 match *self {
219 Body::None => write!(f, "Body::None"),
220 Body::Empty => write!(f, "Body::Empty"),
221 Body::Bytes(ref b) => write!(f, "Body::Bytes({:?})", b),
222 Body::Message(_) => write!(f, "Body::Message(_)"),
223 }
224 }
225}
226
227impl From<&'static str> for Body {
228 fn from(s: &'static str) -> Body {
229 Body::Bytes(Bytes::from_static(s.as_ref()))
230 }
231}
232
233impl From<&'static [u8]> for Body {
234 fn from(s: &'static [u8]) -> Body {
235 Body::Bytes(Bytes::from_static(s))
236 }
237}
238
239impl From<Vec<u8>> for Body {
240 fn from(vec: Vec<u8>) -> Body {
241 Body::Bytes(Bytes::from(vec))
242 }
243}
244
245impl From<String> for Body {
246 fn from(s: String) -> Body {
247 s.into_bytes().into()
248 }
249}
250
251impl<'a> From<&'a String> for Body {
252 fn from(s: &'a String) -> Body {
253 Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
254 }
255}
256
257impl From<Bytes> for Body {
258 fn from(s: Bytes) -> Body {
259 Body::Bytes(s)
260 }
261}
262
263impl From<BytesMut> for Body {
264 fn from(s: BytesMut) -> Body {
265 Body::Bytes(s.freeze())
266 }
267}
268
269impl<S> From<SizedStream<S>> for Body
270where
271 S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
272{
273 fn from(s: SizedStream<S>) -> Body {
274 Body::from_message(s)
275 }
276}
277
278impl<S, E> From<BodyStream<S, E>> for Body
279where
280 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
281 E: Error + 'static,
282{
283 fn from(s: BodyStream<S, E>) -> Body {
284 Body::from_message(s)
285 }
286}
287
288impl<S> From<BoxedBodyStream<S>> for Body
289where
290 S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
291{
292 fn from(s: BoxedBodyStream<S>) -> Body {
293 Body::from_message(s)
294 }
295}
296
297impl MessageBody for Bytes {
298 fn size(&self) -> BodySize {
299 BodySize::Sized(self.len() as u64)
300 }
301
302 fn poll_next_chunk(
303 &mut self,
304 _: &mut Context<'_>,
305 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
306 if self.is_empty() {
307 Poll::Ready(None)
308 } else {
309 Poll::Ready(Some(Ok(mem::take(self))))
310 }
311 }
312}
313
314impl MessageBody for BytesMut {
315 fn size(&self) -> BodySize {
316 BodySize::Sized(self.len() as u64)
317 }
318
319 fn poll_next_chunk(
320 &mut self,
321 _: &mut Context<'_>,
322 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
323 if self.is_empty() {
324 Poll::Ready(None)
325 } else {
326 Poll::Ready(Some(Ok(mem::take(self).freeze())))
327 }
328 }
329}
330
331impl MessageBody for &'static str {
332 fn size(&self) -> BodySize {
333 BodySize::Sized(self.len() as u64)
334 }
335
336 fn poll_next_chunk(
337 &mut self,
338 _: &mut Context<'_>,
339 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
340 if self.is_empty() {
341 Poll::Ready(None)
342 } else {
343 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
344 }
345 }
346}
347
348impl MessageBody for &'static [u8] {
349 fn size(&self) -> BodySize {
350 BodySize::Sized(self.len() as u64)
351 }
352
353 fn poll_next_chunk(
354 &mut self,
355 _: &mut Context<'_>,
356 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
357 if self.is_empty() {
358 Poll::Ready(None)
359 } else {
360 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
361 }
362 }
363}
364
365impl MessageBody for Vec<u8> {
366 fn size(&self) -> BodySize {
367 BodySize::Sized(self.len() as u64)
368 }
369
370 fn poll_next_chunk(
371 &mut self,
372 _: &mut Context<'_>,
373 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
374 if self.is_empty() {
375 Poll::Ready(None)
376 } else {
377 Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
378 }
379 }
380}
381
382impl MessageBody for String {
383 fn size(&self) -> BodySize {
384 BodySize::Sized(self.len() as u64)
385 }
386
387 fn poll_next_chunk(
388 &mut self,
389 _: &mut Context<'_>,
390 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
391 if self.is_empty() {
392 Poll::Ready(None)
393 } else {
394 Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
395 }
396 }
397}
398
399pub struct BodyStream<S, E> {
403 stream: S,
404 _t: PhantomData<E>,
405}
406
407impl<S, E> BodyStream<S, E>
408where
409 S: Stream<Item = Result<Bytes, E>> + Unpin,
410 E: Error,
411{
412 pub fn new(stream: S) -> Self {
413 BodyStream {
414 stream,
415 _t: PhantomData,
416 }
417 }
418}
419
420impl<S, E> fmt::Debug for BodyStream<S, E> {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 f.debug_struct("BodyStream")
423 .field("stream", &std::any::type_name::<S>())
424 .field("error", &std::any::type_name::<E>())
425 .finish()
426 }
427}
428
429impl<S, E> MessageBody for BodyStream<S, E>
430where
431 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
432 E: Error + 'static,
433{
434 fn size(&self) -> BodySize {
435 BodySize::Stream
436 }
437
438 fn poll_next_chunk(
444 &mut self,
445 cx: &mut Context<'_>,
446 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
447 loop {
448 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
449 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
450 Poll::Ready(opt) => opt.map(|res| res.map_err(Into::into)),
451 Poll::Pending => return Poll::Pending,
452 });
453 }
454 }
455}
456
457pub struct BoxedBodyStream<S> {
460 stream: S,
461}
462
463impl<S> BoxedBodyStream<S>
464where
465 S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin,
466{
467 pub fn new(stream: S) -> Self {
468 BoxedBodyStream { stream }
469 }
470}
471
472impl<S> fmt::Debug for BoxedBodyStream<S> {
473 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
474 f.debug_struct("BoxedBodyStream")
475 .field("stream", &std::any::type_name::<S>())
476 .finish()
477 }
478}
479
480impl<S> MessageBody for BoxedBodyStream<S>
481where
482 S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
483{
484 fn size(&self) -> BodySize {
485 BodySize::Stream
486 }
487
488 fn poll_next_chunk(
494 &mut self,
495 cx: &mut Context<'_>,
496 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
497 loop {
498 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
499 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
500 Poll::Ready(opt) => opt,
501 Poll::Pending => return Poll::Pending,
502 });
503 }
504 }
505}
506
507pub struct SizedStream<S> {
510 size: u64,
511 stream: S,
512}
513
514impl<S> SizedStream<S>
515where
516 S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin,
517{
518 pub fn new(size: u64, stream: S) -> Self {
519 SizedStream { size, stream }
520 }
521}
522
523impl<S> fmt::Debug for SizedStream<S> {
524 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525 f.debug_struct("SizedStream")
526 .field("size", &self.size)
527 .field("stream", &std::any::type_name::<S>())
528 .finish()
529 }
530}
531
532impl<S> MessageBody for SizedStream<S>
533where
534 S: Stream<Item = Result<Bytes, Box<dyn Error>>> + Unpin + 'static,
535{
536 fn size(&self) -> BodySize {
537 BodySize::Sized(self.size)
538 }
539
540 fn poll_next_chunk(
546 &mut self,
547 cx: &mut Context<'_>,
548 ) -> Poll<Option<Result<Bytes, Box<dyn Error>>>> {
549 loop {
550 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
551 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
552 Poll::Ready(val) => val,
553 Poll::Pending => return Poll::Pending,
554 });
555 }
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use std::{future::poll_fn, io};
562
563 use futures_util::stream;
564 use ntex_util::future::Ready;
565
566 use super::*;
567
568 impl Body {
569 pub(crate) fn get_ref(&self) -> &[u8] {
570 match *self {
571 Body::Bytes(ref bin) => bin,
572 _ => panic!(),
573 }
574 }
575 }
576
577 #[ntex::test]
578 async fn test_static_str() {
579 assert_eq!(Body::from("").size(), BodySize::Sized(0));
580 assert_eq!(Body::from("test").size(), BodySize::Sized(4));
581 assert_eq!(Body::from("test").get_ref(), b"test");
582
583 assert_eq!("test".size(), BodySize::Sized(4));
584 assert_eq!(
585 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
586 Some(Bytes::from("test"))
587 );
588 assert_eq!(
589 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
590 Some(Bytes::from("test"))
591 );
592 assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
593 }
594
595 #[ntex::test]
596 async fn test_static_bytes() {
597 assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
598 assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
599 assert_eq!(
600 Body::from_slice(b"test".as_ref()).size(),
601 BodySize::Sized(4)
602 );
603 assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
604
605 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
606 assert_eq!(
607 poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
608 .await
609 .unwrap()
610 .ok(),
611 Some(Bytes::from("test"))
612 );
613 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
614 assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
615 }
616
617 #[ntex::test]
618 async fn test_vec() {
619 assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
620 assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
621
622 assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
623 assert_eq!(
624 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
625 .await
626 .unwrap()
627 .ok(),
628 Some(Bytes::from("test"))
629 );
630 assert_eq!(
631 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
632 .await
633 .unwrap()
634 .ok(),
635 Some(Bytes::from("test"))
636 );
637 assert!(poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
638 .await
639 .is_none());
640 }
641
642 #[ntex::test]
643 async fn test_bytes() {
644 let mut b = Bytes::from("test");
645 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
646 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
647
648 assert_eq!(b.size(), BodySize::Sized(4));
649 assert_eq!(
650 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
651 Some(Bytes::from("test"))
652 );
653 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
654 }
655
656 #[ntex::test]
657 async fn test_bytes_mut() {
658 let mut b = Body::from(BytesMut::from("test"));
659 assert_eq!(b.size(), BodySize::Sized(4));
660 assert_eq!(b.get_ref(), b"test");
661 assert_eq!(
662 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
663 Some(Bytes::from("test"))
664 );
665 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
666
667 let mut b = BytesMut::from("test");
668 assert_eq!(b.size(), BodySize::Sized(4));
669 assert_eq!(
670 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
671 Some(Bytes::from("test"))
672 );
673 assert_eq!(b.size(), BodySize::Sized(0));
674 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
675 }
676
677 #[ntex::test]
678 async fn test_string() {
679 let mut b = "test".to_owned();
680 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
681 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
682 assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
683 assert_eq!(Body::from(&b).get_ref(), b"test");
684
685 assert_eq!(b.size(), BodySize::Sized(4));
686 assert_eq!(
687 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
688 Some(Bytes::from("test"))
689 );
690 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
691 }
692
693 #[ntex::test]
694 async fn test_unit() {
695 assert_eq!(().size(), BodySize::Empty);
696 assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
697 }
698
699 #[ntex::test]
700 async fn test_box() {
701 let mut val = Box::new(());
702 assert_eq!(val.size(), BodySize::Empty);
703 assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
704 }
705
706 #[ntex::test]
707 #[allow(clippy::eq_op)]
708 async fn test_body_eq() {
709 assert!(Body::None == Body::None);
710 assert!(Body::None != Body::Empty);
711 assert!(Body::Empty == Body::Empty);
712 assert!(Body::Empty != Body::None);
713 assert!(
714 Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
715 );
716 assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
717 }
718
719 #[ntex::test]
720 async fn test_body_debug() {
721 assert!(format!("{:?}", Body::None).contains("Body::None"));
722 assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
723 assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
724 }
725
726 #[ntex::test]
727 async fn body_stream() {
728 let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
729 assert!(format!("{:?}", st).contains("BodyStream"));
730 let body: Body = st.into();
731 assert!(format!("{:?}", body).contains("Body::Message(_)"));
732 assert!(body != Body::None);
733
734 let res = ResponseBody::new(body);
735 assert!(res.as_ref().is_some());
736 }
737
738 #[ntex::test]
739 async fn boxed_body_stream() {
740 let st = BoxedBodyStream::new(stream::once(Ready::<_, Box<dyn Error>>::Ok(
741 Bytes::from("1"),
742 )));
743 assert!(format!("{:?}", st).contains("BoxedBodyStream"));
744 let body: Body = st.into();
745 assert!(format!("{:?}", body).contains("Body::Message(_)"));
746 assert!(body != Body::None);
747
748 let res = ResponseBody::new(body);
749 assert!(res.as_ref().is_some());
750 }
751
752 #[ntex::test]
753 async fn body_skips_empty_chunks() {
754 let mut body = BodyStream::new(stream::iter(
755 ["1", "", "2"]
756 .iter()
757 .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
758 ));
759 assert_eq!(
760 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
761 Some(Bytes::from("1")),
762 );
763 assert_eq!(
764 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
765 Some(Bytes::from("2")),
766 );
767 }
768
769 #[ntex::test]
770 async fn sized_skips_empty_chunks() {
771 let mut body = SizedStream::new(
772 2,
773 stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
774 );
775 assert!(format!("{:?}", body).contains("SizedStream"));
776 assert_eq!(
777 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
778 Some(Bytes::from("1")),
779 );
780 assert_eq!(
781 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
782 Some(Bytes::from("2")),
783 );
784 }
785}