1use std::{
3 error::Error, fmt, marker::PhantomData, mem, pin::Pin, rc::Rc, task::Context,
4 task::Poll,
5};
6
7use futures_core::Stream;
8use ntex_bytes::{Bytes, BytesMut};
9
10#[derive(Debug, PartialEq, Eq, Copy, Clone)]
11pub enum BodySize {
13 None,
14 Empty,
15 Sized(u64),
16 Stream,
17}
18
19impl BodySize {
20 pub fn is_eof(&self) -> bool {
21 matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
22 }
23}
24
25pub trait MessageBody: 'static {
27 fn size(&self) -> BodySize;
29
30 fn poll_next_chunk(
31 &mut self,
32 cx: &mut Context<'_>,
33 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>>;
34}
35
36impl MessageBody for () {
37 #[inline]
38 fn size(&self) -> BodySize {
39 BodySize::Empty
40 }
41
42 #[inline]
43 fn poll_next_chunk(
44 &mut self,
45 _: &mut Context<'_>,
46 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
47 Poll::Ready(None)
48 }
49}
50
51impl<T: MessageBody> MessageBody for Box<T> {
52 #[inline]
53 fn size(&self) -> BodySize {
54 self.as_ref().size()
55 }
56
57 #[inline]
58 fn poll_next_chunk(
59 &mut self,
60 cx: &mut Context<'_>,
61 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
62 self.as_mut().poll_next_chunk(cx)
63 }
64}
65
66#[derive(Debug)]
67pub enum ResponseBody<B> {
69 Body(B),
70 Other(Body),
71}
72
73impl ResponseBody<Body> {
74 pub fn into_body<B>(self) -> ResponseBody<B> {
75 match self {
76 ResponseBody::Body(b) | ResponseBody::Other(b) => ResponseBody::Other(b),
77 }
78 }
79}
80
81impl From<ResponseBody<Body>> for Body {
82 fn from(b: ResponseBody<Body>) -> Self {
83 match b {
84 ResponseBody::Body(b) | ResponseBody::Other(b) => b,
85 }
86 }
87}
88
89impl<B> From<Body> for ResponseBody<B> {
90 fn from(b: Body) -> Self {
91 ResponseBody::Other(b)
92 }
93}
94
95impl<B> ResponseBody<B> {
96 #[inline]
97 pub fn new(body: B) -> Self {
98 ResponseBody::Body(body)
99 }
100
101 #[inline]
102 #[must_use]
103 pub fn take_body(&mut self) -> ResponseBody<B> {
104 std::mem::replace(self, ResponseBody::Other(Body::None))
105 }
106}
107
108impl<B: MessageBody> ResponseBody<B> {
109 pub fn as_ref(&self) -> Option<&B> {
110 if let ResponseBody::Body(b) = self {
111 Some(b)
112 } else {
113 None
114 }
115 }
116}
117
118impl<B: MessageBody> MessageBody for ResponseBody<B> {
119 #[inline]
120 fn size(&self) -> BodySize {
121 match self {
122 ResponseBody::Body(body) => body.size(),
123 ResponseBody::Other(body) => body.size(),
124 }
125 }
126
127 #[inline]
128 fn poll_next_chunk(
129 &mut self,
130 cx: &mut Context<'_>,
131 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
132 match self {
133 ResponseBody::Body(body) => body.poll_next_chunk(cx),
134 ResponseBody::Other(body) => body.poll_next_chunk(cx),
135 }
136 }
137}
138
139impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
140 type Item = Result<Bytes, Rc<dyn Error>>;
141
142 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
143 match self.get_mut() {
144 ResponseBody::Body(body) => body.poll_next_chunk(cx),
145 ResponseBody::Other(body) => body.poll_next_chunk(cx),
146 }
147 }
148}
149
150pub enum Body {
152 None,
154 Empty,
156 Bytes(Bytes),
158 Message(Box<dyn MessageBody>),
160}
161
162impl Body {
163 pub fn from_slice(s: &[u8]) -> Body {
165 Body::Bytes(Bytes::copy_from_slice(s))
166 }
167
168 pub fn from_message<B: MessageBody>(body: B) -> Body {
170 Body::Message(Box::new(body))
171 }
172}
173
174impl MessageBody for Body {
175 #[inline]
176 fn size(&self) -> BodySize {
177 match self {
178 Body::None => BodySize::None,
179 Body::Empty => BodySize::Empty,
180 Body::Bytes(bin) => BodySize::Sized(bin.len() as u64),
181 Body::Message(body) => body.size(),
182 }
183 }
184
185 fn poll_next_chunk(
186 &mut self,
187 cx: &mut Context<'_>,
188 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
189 match self {
190 Body::None | Body::Empty => Poll::Ready(None),
191 Body::Bytes(bin) => {
192 let len = bin.len();
193 if len == 0 {
194 Poll::Ready(None)
195 } else {
196 Poll::Ready(Some(Ok(mem::take(bin))))
197 }
198 }
199 Body::Message(body) => body.poll_next_chunk(cx),
200 }
201 }
202}
203
204impl PartialEq for Body {
205 fn eq(&self, other: &Body) -> bool {
206 match (self, other) {
207 (Body::None, Body::None) | (Body::Empty, Body::Empty) => true,
208 (Body::Bytes(b), Body::Bytes(b2)) => b == b2,
209 _ => false,
210 }
211 }
212}
213
214impl fmt::Debug for Body {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 match *self {
217 Body::None => write!(f, "Body::None"),
218 Body::Empty => write!(f, "Body::Empty"),
219 Body::Bytes(ref b) => write!(f, "Body::Bytes({b:?})"),
220 Body::Message(_) => write!(f, "Body::Message(_)"),
221 }
222 }
223}
224
225impl From<&'static str> for Body {
226 fn from(s: &'static str) -> Body {
227 Body::Bytes(Bytes::from_static(s.as_ref()))
228 }
229}
230
231impl From<&'static [u8]> for Body {
232 fn from(s: &'static [u8]) -> Body {
233 Body::Bytes(Bytes::from_static(s))
234 }
235}
236
237impl From<Vec<u8>> for Body {
238 fn from(vec: Vec<u8>) -> Body {
239 Body::Bytes(Bytes::from(vec))
240 }
241}
242
243impl From<String> for Body {
244 fn from(s: String) -> Body {
245 s.into_bytes().into()
246 }
247}
248
249impl<'a> From<&'a String> for Body {
250 fn from(s: &'a String) -> Body {
251 Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
252 }
253}
254
255impl From<Bytes> for Body {
256 fn from(s: Bytes) -> Body {
257 Body::Bytes(s)
258 }
259}
260
261impl From<BytesMut> for Body {
262 fn from(s: BytesMut) -> Body {
263 Body::Bytes(s.freeze())
264 }
265}
266
267impl<S> From<SizedStream<S>> for Body
268where
269 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
270{
271 fn from(s: SizedStream<S>) -> Body {
272 Body::from_message(s)
273 }
274}
275
276impl<S, E> From<BodyStream<S, E>> for Body
277where
278 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
279 E: Error + 'static,
280{
281 fn from(s: BodyStream<S, E>) -> Body {
282 Body::from_message(s)
283 }
284}
285
286impl<S> From<BoxedBodyStream<S>> for Body
287where
288 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
289{
290 fn from(s: BoxedBodyStream<S>) -> Body {
291 Body::from_message(s)
292 }
293}
294
295impl MessageBody for Bytes {
296 fn size(&self) -> BodySize {
297 BodySize::Sized(self.len() as u64)
298 }
299
300 fn poll_next_chunk(
301 &mut self,
302 _: &mut Context<'_>,
303 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
304 if self.is_empty() {
305 Poll::Ready(None)
306 } else {
307 Poll::Ready(Some(Ok(mem::take(self))))
308 }
309 }
310}
311
312impl MessageBody for BytesMut {
313 fn size(&self) -> BodySize {
314 BodySize::Sized(self.len() as u64)
315 }
316
317 fn poll_next_chunk(
318 &mut self,
319 _: &mut Context<'_>,
320 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
321 if self.is_empty() {
322 Poll::Ready(None)
323 } else {
324 Poll::Ready(Some(Ok(mem::take(self).freeze())))
325 }
326 }
327}
328
329impl MessageBody for &'static str {
330 fn size(&self) -> BodySize {
331 BodySize::Sized(self.len() as u64)
332 }
333
334 fn poll_next_chunk(
335 &mut self,
336 _: &mut Context<'_>,
337 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
338 if self.is_empty() {
339 Poll::Ready(None)
340 } else {
341 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
342 }
343 }
344}
345
346impl MessageBody for &'static [u8] {
347 fn size(&self) -> BodySize {
348 BodySize::Sized(self.len() as u64)
349 }
350
351 fn poll_next_chunk(
352 &mut self,
353 _: &mut Context<'_>,
354 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
355 if self.is_empty() {
356 Poll::Ready(None)
357 } else {
358 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
359 }
360 }
361}
362
363impl MessageBody for Vec<u8> {
364 fn size(&self) -> BodySize {
365 BodySize::Sized(self.len() as u64)
366 }
367
368 fn poll_next_chunk(
369 &mut self,
370 _: &mut Context<'_>,
371 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
372 if self.is_empty() {
373 Poll::Ready(None)
374 } else {
375 Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
376 }
377 }
378}
379
380impl MessageBody for String {
381 fn size(&self) -> BodySize {
382 BodySize::Sized(self.len() as u64)
383 }
384
385 fn poll_next_chunk(
386 &mut self,
387 _: &mut Context<'_>,
388 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
389 if self.is_empty() {
390 Poll::Ready(None)
391 } else {
392 Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
393 }
394 }
395}
396
397pub struct BodyStream<S, E> {
401 stream: S,
402 _t: PhantomData<E>,
403}
404
405impl<S, E> BodyStream<S, E>
406where
407 S: Stream<Item = Result<Bytes, E>> + Unpin,
408 E: Error,
409{
410 pub fn new(stream: S) -> Self {
411 BodyStream {
412 stream,
413 _t: PhantomData,
414 }
415 }
416}
417
418impl<S, E> fmt::Debug for BodyStream<S, E> {
419 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420 f.debug_struct("BodyStream")
421 .field("stream", &std::any::type_name::<S>())
422 .field("error", &std::any::type_name::<E>())
423 .finish()
424 }
425}
426
427impl<S, E> MessageBody for BodyStream<S, E>
428where
429 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
430 E: Error + 'static,
431{
432 fn size(&self) -> BodySize {
433 BodySize::Stream
434 }
435
436 fn poll_next_chunk(
442 &mut self,
443 cx: &mut Context<'_>,
444 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
445 loop {
446 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
447 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
448 Poll::Ready(opt) => opt.map(|res| {
449 res.map_err(|e| {
450 let e: Rc<dyn Error> = Rc::new(e);
451 e
452 })
453 }),
454 Poll::Pending => return Poll::Pending,
455 });
456 }
457 }
458}
459
460pub struct BoxedBodyStream<S> {
463 stream: S,
464}
465
466impl<S> BoxedBodyStream<S>
467where
468 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
469{
470 pub fn new(stream: S) -> Self {
471 BoxedBodyStream { stream }
472 }
473}
474
475impl<S> fmt::Debug for BoxedBodyStream<S> {
476 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
477 f.debug_struct("BoxedBodyStream")
478 .field("stream", &std::any::type_name::<S>())
479 .finish()
480 }
481}
482
483impl<S> MessageBody for BoxedBodyStream<S>
484where
485 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
486{
487 fn size(&self) -> BodySize {
488 BodySize::Stream
489 }
490
491 fn poll_next_chunk(
497 &mut self,
498 cx: &mut Context<'_>,
499 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
500 loop {
501 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
502 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
503 Poll::Ready(opt) => opt,
504 Poll::Pending => return Poll::Pending,
505 });
506 }
507 }
508}
509
510pub struct SizedStream<S> {
513 size: u64,
514 stream: S,
515}
516
517impl<S> SizedStream<S>
518where
519 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
520{
521 pub fn new(size: u64, stream: S) -> Self {
522 SizedStream { size, stream }
523 }
524}
525
526impl<S> fmt::Debug for SizedStream<S> {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 f.debug_struct("SizedStream")
529 .field("size", &self.size)
530 .field("stream", &std::any::type_name::<S>())
531 .finish()
532 }
533}
534
535impl<S> MessageBody for SizedStream<S>
536where
537 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
538{
539 fn size(&self) -> BodySize {
540 BodySize::Sized(self.size)
541 }
542
543 fn poll_next_chunk(
549 &mut self,
550 cx: &mut Context<'_>,
551 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
552 loop {
553 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
554 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
555 Poll::Ready(val) => val,
556 Poll::Pending => return Poll::Pending,
557 });
558 }
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use std::{future::poll_fn, io};
565
566 use futures_util::stream;
567 use ntex::util::Ready;
568
569 use super::*;
570
571 impl Body {
572 pub(crate) fn get_ref(&self) -> &[u8] {
573 if let Body::Bytes(bin) = self { bin } else { panic!() }
574 }
575 }
576
577 #[ntex::test]
578 async fn test_static_str() {
579 assert_eq!(Body::from("").size(), BodySize::Sized(0));
580 assert_eq!(Body::from("test").size(), BodySize::Sized(4));
581 assert_eq!(Body::from("test").get_ref(), b"test");
582
583 assert_eq!("test".size(), BodySize::Sized(4));
584 assert_eq!(
585 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
586 Some(Bytes::from("test"))
587 );
588 assert_eq!(
589 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
590 Some(Bytes::from("test"))
591 );
592 assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
593 }
594
595 #[ntex::test]
596 async fn test_static_bytes() {
597 assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
598 assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
599 assert_eq!(
600 Body::from_slice(b"test".as_ref()).size(),
601 BodySize::Sized(4)
602 );
603 assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
604
605 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
606 assert_eq!(
607 poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
608 .await
609 .unwrap()
610 .ok(),
611 Some(Bytes::from("test"))
612 );
613 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
614 assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
615 }
616
617 #[ntex::test]
618 async fn test_vec() {
619 assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
620 assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
621
622 assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
623 assert_eq!(
624 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
625 .await
626 .unwrap()
627 .ok(),
628 Some(Bytes::from("test"))
629 );
630 assert_eq!(
631 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
632 .await
633 .unwrap()
634 .ok(),
635 Some(Bytes::from("test"))
636 );
637 assert!(
638 poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
639 .await
640 .is_none()
641 );
642 }
643
644 #[ntex::test]
645 async fn test_bytes() {
646 let mut b = Bytes::from("test");
647 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
648 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
649
650 assert_eq!(b.size(), BodySize::Sized(4));
651 assert_eq!(
652 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
653 Some(Bytes::from("test"))
654 );
655 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
656 }
657
658 #[ntex::test]
659 async fn test_bytes_mut() {
660 let mut b = Body::from(BytesMut::from("test"));
661 assert_eq!(b.size(), BodySize::Sized(4));
662 assert_eq!(b.get_ref(), b"test");
663 assert_eq!(
664 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
665 Some(Bytes::from("test"))
666 );
667 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
668
669 let mut b = BytesMut::from("test");
670 assert_eq!(b.size(), BodySize::Sized(4));
671 assert_eq!(
672 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
673 Some(Bytes::from("test"))
674 );
675 assert_eq!(b.size(), BodySize::Sized(0));
676 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
677 }
678
679 #[ntex::test]
680 async fn test_string() {
681 let mut b = "test".to_owned();
682 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
683 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
684 assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
685 assert_eq!(Body::from(&b).get_ref(), b"test");
686
687 assert_eq!(b.size(), BodySize::Sized(4));
688 assert_eq!(
689 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
690 Some(Bytes::from("test"))
691 );
692 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
693 }
694
695 #[ntex::test]
696 async fn test_unit() {
697 assert_eq!(().size(), BodySize::Empty);
698 assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
699 }
700
701 #[ntex::test]
702 async fn test_box() {
703 let mut val = Box::new(());
704 assert_eq!(val.size(), BodySize::Empty);
705 assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
706 }
707
708 #[ntex::test]
709 #[allow(clippy::eq_op)]
710 async fn test_body_eq() {
711 assert!(Body::None == Body::None);
712 assert!(Body::None != Body::Empty);
713 assert!(Body::Empty == Body::Empty);
714 assert!(Body::Empty != Body::None);
715 assert!(
716 Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
717 );
718 assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
719 }
720
721 #[ntex::test]
722 async fn test_body_debug() {
723 assert!(format!("{:?}", Body::None).contains("Body::None"));
724 assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
725 assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
726 }
727
728 #[ntex::test]
729 async fn body_stream() {
730 let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
731 assert!(format!("{st:?}").contains("BodyStream"));
732 let body: Body = st.into();
733 assert!(format!("{body:?}").contains("Body::Message(_)"));
734 assert!(body != Body::None);
735
736 let res = ResponseBody::new(body);
737 assert!(res.as_ref().is_some());
738 }
739
740 #[ntex::test]
741 async fn boxed_body_stream() {
742 let st = BoxedBodyStream::new(stream::once(Ready::<_, Rc<dyn Error>>::Ok(
743 Bytes::from("1"),
744 )));
745 assert!(format!("{st:?}").contains("BoxedBodyStream"));
746 let body: Body = st.into();
747 assert!(format!("{body:?}").contains("Body::Message(_)"));
748 assert!(body != Body::None);
749
750 let res = ResponseBody::new(body);
751 assert!(res.as_ref().is_some());
752 }
753
754 #[ntex::test]
755 async fn body_skips_empty_chunks() {
756 let mut body = BodyStream::new(stream::iter(
757 ["1", "", "2"]
758 .iter()
759 .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
760 ));
761 assert_eq!(
762 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
763 Some(Bytes::from("1")),
764 );
765 assert_eq!(
766 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
767 Some(Bytes::from("2")),
768 );
769 }
770
771 #[ntex::test]
772 async fn sized_skips_empty_chunks() {
773 let mut body = SizedStream::new(
774 2,
775 stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
776 );
777 assert!(format!("{body:?}").contains("SizedStream"));
778 assert_eq!(
779 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
780 Some(Bytes::from("1")),
781 );
782 assert_eq!(
783 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
784 Some(Bytes::from("2")),
785 );
786 }
787}