1use std::{
3 error::Error, fmt, marker::PhantomData, mem, pin::Pin, rc::Rc, task::Context,
4 task::Poll,
5};
6
7use futures_core::Stream;
8use ntex_bytes::{Bytes, BytesMut};
9
10#[derive(Debug, PartialEq, Eq, Copy, Clone)]
11pub enum BodySize {
13 None,
14 Empty,
15 Sized(u64),
16 Stream,
17}
18
19impl BodySize {
20 pub fn is_eof(&self) -> bool {
21 matches!(self, BodySize::None | BodySize::Empty | BodySize::Sized(0))
22 }
23}
24
25pub trait MessageBody: 'static {
27 fn size(&self) -> BodySize;
29
30 fn poll_next_chunk(
31 &mut self,
32 cx: &mut Context<'_>,
33 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>>;
34}
35
36impl MessageBody for () {
37 #[inline]
38 fn size(&self) -> BodySize {
39 BodySize::Empty
40 }
41
42 #[inline]
43 fn poll_next_chunk(
44 &mut self,
45 _: &mut Context<'_>,
46 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
47 Poll::Ready(None)
48 }
49}
50
51impl<T: MessageBody> MessageBody for Box<T> {
52 #[inline]
53 fn size(&self) -> BodySize {
54 self.as_ref().size()
55 }
56
57 #[inline]
58 fn poll_next_chunk(
59 &mut self,
60 cx: &mut Context<'_>,
61 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
62 self.as_mut().poll_next_chunk(cx)
63 }
64}
65
66#[derive(Debug)]
67pub enum ResponseBody<B> {
69 Body(B),
70 Other(Body),
71}
72
73impl ResponseBody<Body> {
74 pub fn into_body<B>(self) -> ResponseBody<B> {
75 match self {
76 ResponseBody::Body(b) => ResponseBody::Other(b),
77 ResponseBody::Other(b) => ResponseBody::Other(b),
78 }
79 }
80}
81
82impl From<ResponseBody<Body>> for Body {
83 fn from(b: ResponseBody<Body>) -> Self {
84 match b {
85 ResponseBody::Body(b) => b,
86 ResponseBody::Other(b) => b,
87 }
88 }
89}
90
91impl<B> From<Body> for ResponseBody<B> {
92 fn from(b: Body) -> Self {
93 ResponseBody::Other(b)
94 }
95}
96
97impl<B> ResponseBody<B> {
98 #[inline]
99 pub fn new(body: B) -> Self {
100 ResponseBody::Body(body)
101 }
102
103 #[inline]
104 pub fn take_body(&mut self) -> ResponseBody<B> {
105 std::mem::replace(self, ResponseBody::Other(Body::None))
106 }
107}
108
109impl<B: MessageBody> ResponseBody<B> {
110 pub fn as_ref(&self) -> Option<&B> {
111 if let ResponseBody::Body(ref b) = self {
112 Some(b)
113 } else {
114 None
115 }
116 }
117}
118
119impl<B: MessageBody> MessageBody for ResponseBody<B> {
120 #[inline]
121 fn size(&self) -> BodySize {
122 match self {
123 ResponseBody::Body(ref body) => body.size(),
124 ResponseBody::Other(ref body) => body.size(),
125 }
126 }
127
128 #[inline]
129 fn poll_next_chunk(
130 &mut self,
131 cx: &mut Context<'_>,
132 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
133 match self {
134 ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
135 ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
136 }
137 }
138}
139
140impl<B: MessageBody + Unpin> Stream for ResponseBody<B> {
141 type Item = Result<Bytes, Rc<dyn Error>>;
142
143 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 match self.get_mut() {
145 ResponseBody::Body(ref mut body) => body.poll_next_chunk(cx),
146 ResponseBody::Other(ref mut body) => body.poll_next_chunk(cx),
147 }
148 }
149}
150
151pub enum Body {
153 None,
155 Empty,
157 Bytes(Bytes),
159 Message(Box<dyn MessageBody>),
161}
162
163impl Body {
164 pub fn from_slice(s: &[u8]) -> Body {
166 Body::Bytes(Bytes::copy_from_slice(s))
167 }
168
169 pub fn from_message<B: MessageBody>(body: B) -> Body {
171 Body::Message(Box::new(body))
172 }
173}
174
175impl MessageBody for Body {
176 #[inline]
177 fn size(&self) -> BodySize {
178 match self {
179 Body::None => BodySize::None,
180 Body::Empty => BodySize::Empty,
181 Body::Bytes(ref bin) => BodySize::Sized(bin.len() as u64),
182 Body::Message(ref body) => body.size(),
183 }
184 }
185
186 fn poll_next_chunk(
187 &mut self,
188 cx: &mut Context<'_>,
189 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
190 match self {
191 Body::None => Poll::Ready(None),
192 Body::Empty => Poll::Ready(None),
193 Body::Bytes(ref mut bin) => {
194 let len = bin.len();
195 if len == 0 {
196 Poll::Ready(None)
197 } else {
198 Poll::Ready(Some(Ok(mem::take(bin))))
199 }
200 }
201 Body::Message(ref mut body) => body.poll_next_chunk(cx),
202 }
203 }
204}
205
206impl PartialEq for Body {
207 fn eq(&self, other: &Body) -> bool {
208 match (self, other) {
209 (Body::None, Body::None) => true,
210 (Body::Empty, Body::Empty) => true,
211 (Body::Bytes(ref b), Body::Bytes(ref b2)) => b == b2,
212 _ => false,
213 }
214 }
215}
216
217impl fmt::Debug for Body {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 match *self {
220 Body::None => write!(f, "Body::None"),
221 Body::Empty => write!(f, "Body::Empty"),
222 Body::Bytes(ref b) => write!(f, "Body::Bytes({b:?})"),
223 Body::Message(_) => write!(f, "Body::Message(_)"),
224 }
225 }
226}
227
228impl From<&'static str> for Body {
229 fn from(s: &'static str) -> Body {
230 Body::Bytes(Bytes::from_static(s.as_ref()))
231 }
232}
233
234impl From<&'static [u8]> for Body {
235 fn from(s: &'static [u8]) -> Body {
236 Body::Bytes(Bytes::from_static(s))
237 }
238}
239
240impl From<Vec<u8>> for Body {
241 fn from(vec: Vec<u8>) -> Body {
242 Body::Bytes(Bytes::from(vec))
243 }
244}
245
246impl From<String> for Body {
247 fn from(s: String) -> Body {
248 s.into_bytes().into()
249 }
250}
251
252impl<'a> From<&'a String> for Body {
253 fn from(s: &'a String) -> Body {
254 Body::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
255 }
256}
257
258impl From<Bytes> for Body {
259 fn from(s: Bytes) -> Body {
260 Body::Bytes(s)
261 }
262}
263
264impl From<BytesMut> for Body {
265 fn from(s: BytesMut) -> Body {
266 Body::Bytes(s.freeze())
267 }
268}
269
270impl<S> From<SizedStream<S>> for Body
271where
272 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
273{
274 fn from(s: SizedStream<S>) -> Body {
275 Body::from_message(s)
276 }
277}
278
279impl<S, E> From<BodyStream<S, E>> for Body
280where
281 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
282 E: Error + 'static,
283{
284 fn from(s: BodyStream<S, E>) -> Body {
285 Body::from_message(s)
286 }
287}
288
289impl<S> From<BoxedBodyStream<S>> for Body
290where
291 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
292{
293 fn from(s: BoxedBodyStream<S>) -> Body {
294 Body::from_message(s)
295 }
296}
297
298impl MessageBody for Bytes {
299 fn size(&self) -> BodySize {
300 BodySize::Sized(self.len() as u64)
301 }
302
303 fn poll_next_chunk(
304 &mut self,
305 _: &mut Context<'_>,
306 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
307 if self.is_empty() {
308 Poll::Ready(None)
309 } else {
310 Poll::Ready(Some(Ok(mem::take(self))))
311 }
312 }
313}
314
315impl MessageBody for BytesMut {
316 fn size(&self) -> BodySize {
317 BodySize::Sized(self.len() as u64)
318 }
319
320 fn poll_next_chunk(
321 &mut self,
322 _: &mut Context<'_>,
323 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
324 if self.is_empty() {
325 Poll::Ready(None)
326 } else {
327 Poll::Ready(Some(Ok(mem::take(self).freeze())))
328 }
329 }
330}
331
332impl MessageBody for &'static str {
333 fn size(&self) -> BodySize {
334 BodySize::Sized(self.len() as u64)
335 }
336
337 fn poll_next_chunk(
338 &mut self,
339 _: &mut Context<'_>,
340 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
341 if self.is_empty() {
342 Poll::Ready(None)
343 } else {
344 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self).as_ref()))))
345 }
346 }
347}
348
349impl MessageBody for &'static [u8] {
350 fn size(&self) -> BodySize {
351 BodySize::Sized(self.len() as u64)
352 }
353
354 fn poll_next_chunk(
355 &mut self,
356 _: &mut Context<'_>,
357 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
358 if self.is_empty() {
359 Poll::Ready(None)
360 } else {
361 Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self)))))
362 }
363 }
364}
365
366impl MessageBody for Vec<u8> {
367 fn size(&self) -> BodySize {
368 BodySize::Sized(self.len() as u64)
369 }
370
371 fn poll_next_chunk(
372 &mut self,
373 _: &mut Context<'_>,
374 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
375 if self.is_empty() {
376 Poll::Ready(None)
377 } else {
378 Poll::Ready(Some(Ok(Bytes::from(mem::take(self)))))
379 }
380 }
381}
382
383impl MessageBody for String {
384 fn size(&self) -> BodySize {
385 BodySize::Sized(self.len() as u64)
386 }
387
388 fn poll_next_chunk(
389 &mut self,
390 _: &mut Context<'_>,
391 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
392 if self.is_empty() {
393 Poll::Ready(None)
394 } else {
395 Poll::Ready(Some(Ok(Bytes::from(mem::take(self).into_bytes()))))
396 }
397 }
398}
399
400pub struct BodyStream<S, E> {
404 stream: S,
405 _t: PhantomData<E>,
406}
407
408impl<S, E> BodyStream<S, E>
409where
410 S: Stream<Item = Result<Bytes, E>> + Unpin,
411 E: Error,
412{
413 pub fn new(stream: S) -> Self {
414 BodyStream {
415 stream,
416 _t: PhantomData,
417 }
418 }
419}
420
421impl<S, E> fmt::Debug for BodyStream<S, E> {
422 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423 f.debug_struct("BodyStream")
424 .field("stream", &std::any::type_name::<S>())
425 .field("error", &std::any::type_name::<E>())
426 .finish()
427 }
428}
429
430impl<S, E> MessageBody for BodyStream<S, E>
431where
432 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
433 E: Error + 'static,
434{
435 fn size(&self) -> BodySize {
436 BodySize::Stream
437 }
438
439 fn poll_next_chunk(
445 &mut self,
446 cx: &mut Context<'_>,
447 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
448 loop {
449 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
450 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
451 Poll::Ready(opt) => opt.map(|res| {
452 res.map_err(|e| {
453 let e: Rc<dyn Error> = Rc::new(e);
454 e
455 })
456 }),
457 Poll::Pending => return Poll::Pending,
458 });
459 }
460 }
461}
462
463pub struct BoxedBodyStream<S> {
466 stream: S,
467}
468
469impl<S> BoxedBodyStream<S>
470where
471 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
472{
473 pub fn new(stream: S) -> Self {
474 BoxedBodyStream { stream }
475 }
476}
477
478impl<S> fmt::Debug for BoxedBodyStream<S> {
479 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
480 f.debug_struct("BoxedBodyStream")
481 .field("stream", &std::any::type_name::<S>())
482 .finish()
483 }
484}
485
486impl<S> MessageBody for BoxedBodyStream<S>
487where
488 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
489{
490 fn size(&self) -> BodySize {
491 BodySize::Stream
492 }
493
494 fn poll_next_chunk(
500 &mut self,
501 cx: &mut Context<'_>,
502 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
503 loop {
504 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
505 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
506 Poll::Ready(opt) => opt,
507 Poll::Pending => return Poll::Pending,
508 });
509 }
510 }
511}
512
513pub struct SizedStream<S> {
516 size: u64,
517 stream: S,
518}
519
520impl<S> SizedStream<S>
521where
522 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin,
523{
524 pub fn new(size: u64, stream: S) -> Self {
525 SizedStream { size, stream }
526 }
527}
528
529impl<S> fmt::Debug for SizedStream<S> {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 f.debug_struct("SizedStream")
532 .field("size", &self.size)
533 .field("stream", &std::any::type_name::<S>())
534 .finish()
535 }
536}
537
538impl<S> MessageBody for SizedStream<S>
539where
540 S: Stream<Item = Result<Bytes, Rc<dyn Error>>> + Unpin + 'static,
541{
542 fn size(&self) -> BodySize {
543 BodySize::Sized(self.size)
544 }
545
546 fn poll_next_chunk(
552 &mut self,
553 cx: &mut Context<'_>,
554 ) -> Poll<Option<Result<Bytes, Rc<dyn Error>>>> {
555 loop {
556 return Poll::Ready(match Pin::new(&mut self.stream).poll_next(cx) {
557 Poll::Ready(Some(Ok(ref bytes))) if bytes.is_empty() => continue,
558 Poll::Ready(val) => val,
559 Poll::Pending => return Poll::Pending,
560 });
561 }
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use std::{future::poll_fn, io};
568
569 use futures_util::stream;
570 use ntex_util::future::Ready;
571
572 use super::*;
573
574 impl Body {
575 pub(crate) fn get_ref(&self) -> &[u8] {
576 match *self {
577 Body::Bytes(ref bin) => bin,
578 _ => panic!(),
579 }
580 }
581 }
582
583 #[ntex::test]
584 async fn test_static_str() {
585 assert_eq!(Body::from("").size(), BodySize::Sized(0));
586 assert_eq!(Body::from("test").size(), BodySize::Sized(4));
587 assert_eq!(Body::from("test").get_ref(), b"test");
588
589 assert_eq!("test".size(), BodySize::Sized(4));
590 assert_eq!(
591 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
592 Some(Bytes::from("test"))
593 );
594 assert_eq!(
595 poll_fn(|cx| "test".poll_next_chunk(cx)).await.unwrap().ok(),
596 Some(Bytes::from("test"))
597 );
598 assert!(poll_fn(|cx| "".poll_next_chunk(cx)).await.is_none());
599 }
600
601 #[ntex::test]
602 async fn test_static_bytes() {
603 assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
604 assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
605 assert_eq!(
606 Body::from_slice(b"test".as_ref()).size(),
607 BodySize::Sized(4)
608 );
609 assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
610
611 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
612 assert_eq!(
613 poll_fn(|cx| (&b"test"[..]).poll_next_chunk(cx))
614 .await
615 .unwrap()
616 .ok(),
617 Some(Bytes::from("test"))
618 );
619 assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
620 assert!(poll_fn(|cx| (&b""[..]).poll_next_chunk(cx)).await.is_none());
621 }
622
623 #[ntex::test]
624 async fn test_vec() {
625 assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
626 assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
627
628 assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
629 assert_eq!(
630 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
631 .await
632 .unwrap()
633 .ok(),
634 Some(Bytes::from("test"))
635 );
636 assert_eq!(
637 poll_fn(|cx| Vec::from("test").poll_next_chunk(cx))
638 .await
639 .unwrap()
640 .ok(),
641 Some(Bytes::from("test"))
642 );
643 assert!(poll_fn(|cx| Vec::<u8>::new().poll_next_chunk(cx))
644 .await
645 .is_none());
646 }
647
648 #[ntex::test]
649 async fn test_bytes() {
650 let mut b = Bytes::from("test");
651 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
652 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
653
654 assert_eq!(b.size(), BodySize::Sized(4));
655 assert_eq!(
656 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
657 Some(Bytes::from("test"))
658 );
659 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
660 }
661
662 #[ntex::test]
663 async fn test_bytes_mut() {
664 let mut b = Body::from(BytesMut::from("test"));
665 assert_eq!(b.size(), BodySize::Sized(4));
666 assert_eq!(b.get_ref(), b"test");
667 assert_eq!(
668 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
669 Some(Bytes::from("test"))
670 );
671 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
672
673 let mut b = BytesMut::from("test");
674 assert_eq!(b.size(), BodySize::Sized(4));
675 assert_eq!(
676 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
677 Some(Bytes::from("test"))
678 );
679 assert_eq!(b.size(), BodySize::Sized(0));
680 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
681 }
682
683 #[ntex::test]
684 async fn test_string() {
685 let mut b = "test".to_owned();
686 assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
687 assert_eq!(Body::from(b.clone()).get_ref(), b"test");
688 assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
689 assert_eq!(Body::from(&b).get_ref(), b"test");
690
691 assert_eq!(b.size(), BodySize::Sized(4));
692 assert_eq!(
693 poll_fn(|cx| b.poll_next_chunk(cx)).await.unwrap().ok(),
694 Some(Bytes::from("test"))
695 );
696 assert!(poll_fn(|cx| b.poll_next_chunk(cx)).await.is_none(),);
697 }
698
699 #[ntex::test]
700 async fn test_unit() {
701 assert_eq!(().size(), BodySize::Empty);
702 assert!(poll_fn(|cx| ().poll_next_chunk(cx)).await.is_none());
703 }
704
705 #[ntex::test]
706 async fn test_box() {
707 let mut val = Box::new(());
708 assert_eq!(val.size(), BodySize::Empty);
709 assert!(poll_fn(|cx| val.poll_next_chunk(cx)).await.is_none());
710 }
711
712 #[ntex::test]
713 #[allow(clippy::eq_op)]
714 async fn test_body_eq() {
715 assert!(Body::None == Body::None);
716 assert!(Body::None != Body::Empty);
717 assert!(Body::Empty == Body::Empty);
718 assert!(Body::Empty != Body::None);
719 assert!(
720 Body::Bytes(Bytes::from_static(b"1")) == Body::Bytes(Bytes::from_static(b"1"))
721 );
722 assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
723 }
724
725 #[ntex::test]
726 async fn test_body_debug() {
727 assert!(format!("{:?}", Body::None).contains("Body::None"));
728 assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
729 assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains('1'));
730 }
731
732 #[ntex::test]
733 async fn body_stream() {
734 let st = BodyStream::new(stream::once(Ready::<_, io::Error>::Ok(Bytes::from("1"))));
735 assert!(format!("{st:?}").contains("BodyStream"));
736 let body: Body = st.into();
737 assert!(format!("{body:?}").contains("Body::Message(_)"));
738 assert!(body != Body::None);
739
740 let res = ResponseBody::new(body);
741 assert!(res.as_ref().is_some());
742 }
743
744 #[ntex::test]
745 async fn boxed_body_stream() {
746 let st = BoxedBodyStream::new(stream::once(Ready::<_, Rc<dyn Error>>::Ok(
747 Bytes::from("1"),
748 )));
749 assert!(format!("{st:?}").contains("BoxedBodyStream"));
750 let body: Body = st.into();
751 assert!(format!("{body:?}").contains("Body::Message(_)"));
752 assert!(body != Body::None);
753
754 let res = ResponseBody::new(body);
755 assert!(res.as_ref().is_some());
756 }
757
758 #[ntex::test]
759 async fn body_skips_empty_chunks() {
760 let mut body = BodyStream::new(stream::iter(
761 ["1", "", "2"]
762 .iter()
763 .map(|&v| Ok(Bytes::from(v)) as Result<Bytes, io::Error>),
764 ));
765 assert_eq!(
766 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
767 Some(Bytes::from("1")),
768 );
769 assert_eq!(
770 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
771 Some(Bytes::from("2")),
772 );
773 }
774
775 #[ntex::test]
776 async fn sized_skips_empty_chunks() {
777 let mut body = SizedStream::new(
778 2,
779 stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
780 );
781 assert!(format!("{body:?}").contains("SizedStream"));
782 assert_eq!(
783 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
784 Some(Bytes::from("1")),
785 );
786 assert_eq!(
787 poll_fn(|cx| body.poll_next_chunk(cx)).await.unwrap().ok(),
788 Some(Bytes::from("2")),
789 );
790 }
791}