rama_hyper/body/
incoming.rs

1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
11use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
12#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
13use http::HeaderMap;
14use http_body::{Body, Frame, SizeHint};
15
16#[cfg(all(
17    any(feature = "http1", feature = "http2"),
18    any(feature = "client", feature = "server")
19))]
20use super::DecodedLength;
21#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
22use crate::common::watch;
23#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
24use crate::proto::h2::ping;
25
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
28#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
29type TrailersSender = oneshot::Sender<HeaderMap>;
30
31/// A stream of `Bytes`, used when receiving bodies from the network.
32///
33/// Note that Users should not instantiate this struct directly. When working with the hyper client,
34/// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server,
35/// it is provided within requests.
36///
37/// # Examples
38///
39/// ```rust,ignore
40/// async fn echo(
41///    req: Request<hyper::body::Incoming>,
42/// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
43///    //Here, you can process `Incoming`
44/// }
45/// ```
46#[must_use = "streams do nothing unless polled"]
47pub struct Incoming {
48    kind: Kind,
49}
50
51enum Kind {
52    Empty,
53    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
54    Chan {
55        content_length: DecodedLength,
56        want_tx: watch::Sender,
57        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
58        trailers_rx: oneshot::Receiver<HeaderMap>,
59    },
60    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
61    H2 {
62        content_length: DecodedLength,
63        data_done: bool,
64        ping: ping::Recorder,
65        recv: h2::RecvStream,
66    },
67    #[cfg(feature = "ffi")]
68    Ffi(crate::ffi::UserBody),
69}
70
71/// A sender half created through [`Body::channel()`].
72///
73/// Useful when wanting to stream chunks from another thread.
74///
75/// ## Body Closing
76///
77/// Note that the request body will always be closed normally when the sender is dropped (meaning
78/// that the empty terminating chunk will be sent to the remote). If you desire to close the
79/// connection with an incomplete response (e.g. in the case of an error during asynchronous
80/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
81///
82/// [`Body::channel()`]: struct.Body.html#method.channel
83/// [`Sender::abort()`]: struct.Sender.html#method.abort
84#[must_use = "Sender does nothing unless sent on"]
85#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
86pub(crate) struct Sender {
87    want_rx: watch::Receiver,
88    data_tx: BodySender,
89    trailers_tx: Option<TrailersSender>,
90}
91
92#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
93const WANT_PENDING: usize = 1;
94#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
95const WANT_READY: usize = 2;
96
97impl Incoming {
98    /// Create a `Body` stream with an associated sender half.
99    ///
100    /// Useful when wanting to stream chunks from another thread.
101    #[inline]
102    #[cfg(test)]
103    pub(crate) fn channel() -> (Sender, Incoming) {
104        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
105    }
106
107    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
108    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
109        let (data_tx, data_rx) = mpsc::channel(0);
110        let (trailers_tx, trailers_rx) = oneshot::channel();
111
112        // If wanter is true, `Sender::poll_ready()` won't becoming ready
113        // until the `Body` has been polled for data once.
114        let want = if wanter { WANT_PENDING } else { WANT_READY };
115
116        let (want_tx, want_rx) = watch::channel(want);
117
118        let tx = Sender {
119            want_rx,
120            data_tx,
121            trailers_tx: Some(trailers_tx),
122        };
123        let rx = Incoming::new(Kind::Chan {
124            content_length,
125            want_tx,
126            data_rx,
127            trailers_rx,
128        });
129
130        (tx, rx)
131    }
132
133    fn new(kind: Kind) -> Incoming {
134        Incoming { kind }
135    }
136
137    #[allow(dead_code)]
138    pub(crate) fn empty() -> Incoming {
139        Incoming::new(Kind::Empty)
140    }
141
142    #[cfg(feature = "ffi")]
143    pub(crate) fn ffi() -> Incoming {
144        Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
145    }
146
147    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
148    pub(crate) fn h2(
149        recv: h2::RecvStream,
150        mut content_length: DecodedLength,
151        ping: ping::Recorder,
152    ) -> Self {
153        // If the stream is already EOS, then the "unknown length" is clearly
154        // actually ZERO.
155        if !content_length.is_exact() && recv.is_end_stream() {
156            content_length = DecodedLength::ZERO;
157        }
158
159        Incoming::new(Kind::H2 {
160            data_done: false,
161            ping,
162            content_length,
163            recv,
164        })
165    }
166
167    #[cfg(feature = "ffi")]
168    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
169        match self.kind {
170            Kind::Ffi(ref mut body) => return body,
171            _ => {
172                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
173            }
174        }
175
176        match self.kind {
177            Kind::Ffi(ref mut body) => body,
178            _ => unreachable!(),
179        }
180    }
181}
182
183impl Body for Incoming {
184    type Data = Bytes;
185    type Error = crate::Error;
186
187    fn poll_frame(
188        #[cfg_attr(
189            not(all(
190                any(feature = "http1", feature = "http2"),
191                any(feature = "client", feature = "server")
192            )),
193            allow(unused_mut)
194        )]
195        mut self: Pin<&mut Self>,
196        #[cfg_attr(
197            not(all(
198                any(feature = "http1", feature = "http2"),
199                any(feature = "client", feature = "server")
200            )),
201            allow(unused_variables)
202        )]
203        cx: &mut Context<'_>,
204    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
205        match self.kind {
206            Kind::Empty => Poll::Ready(None),
207            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
208            Kind::Chan {
209                content_length: ref mut len,
210                ref mut data_rx,
211                ref mut want_tx,
212                ref mut trailers_rx,
213            } => {
214                want_tx.send(WANT_READY);
215
216                if !data_rx.is_terminated() {
217                    if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
218                        len.sub_if(chunk.len() as u64);
219                        return Poll::Ready(Some(Ok(Frame::data(chunk))));
220                    }
221                }
222
223                // check trailers after data is terminated
224                match ready!(Pin::new(trailers_rx).poll(cx)) {
225                    Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
226                    Err(_) => Poll::Ready(None),
227                }
228            }
229            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
230            Kind::H2 {
231                ref mut data_done,
232                ref ping,
233                recv: ref mut h2,
234                content_length: ref mut len,
235            } => {
236                if !*data_done {
237                    match ready!(h2.poll_data(cx)) {
238                        Some(Ok(bytes)) => {
239                            let _ = h2.flow_control().release_capacity(bytes.len());
240                            len.sub_if(bytes.len() as u64);
241                            ping.record_data(bytes.len());
242                            return Poll::Ready(Some(Ok(Frame::data(bytes))));
243                        }
244                        Some(Err(e)) => {
245                            return match e.reason() {
246                                // These reasons should cause the body reading to stop, but not fail it.
247                                // The same logic as for `Read for H2Upgraded` is applied here.
248                                Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
249                                    Poll::Ready(None)
250                                }
251                                _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
252                            };
253                        }
254                        None => {
255                            *data_done = true;
256                            // fall through to trailers
257                        }
258                    }
259                }
260
261                // after data, check trailers
262                match ready!(h2.poll_trailers(cx)) {
263                    Ok(t) => {
264                        ping.record_non_data();
265                        Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
266                    }
267                    Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
268                }
269            }
270
271            #[cfg(feature = "ffi")]
272            Kind::Ffi(ref mut body) => body.poll_data(cx),
273        }
274    }
275
276    fn is_end_stream(&self) -> bool {
277        match self.kind {
278            Kind::Empty => true,
279            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
280            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
281            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
282            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
283            #[cfg(feature = "ffi")]
284            Kind::Ffi(..) => false,
285        }
286    }
287
288    fn size_hint(&self) -> SizeHint {
289        #[cfg(all(
290            any(feature = "http1", feature = "http2"),
291            any(feature = "client", feature = "server")
292        ))]
293        fn opt_len(decoded_length: DecodedLength) -> SizeHint {
294            if let Some(content_length) = decoded_length.into_opt() {
295                SizeHint::with_exact(content_length)
296            } else {
297                SizeHint::default()
298            }
299        }
300
301        match self.kind {
302            Kind::Empty => SizeHint::with_exact(0),
303            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
304            Kind::Chan { content_length, .. } => opt_len(content_length),
305            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
306            Kind::H2 { content_length, .. } => opt_len(content_length),
307            #[cfg(feature = "ffi")]
308            Kind::Ffi(..) => SizeHint::default(),
309        }
310    }
311}
312
313impl fmt::Debug for Incoming {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        #[derive(Debug)]
316        struct Streaming;
317        #[derive(Debug)]
318        struct Empty;
319
320        let mut builder = f.debug_tuple("Body");
321        match self.kind {
322            Kind::Empty => builder.field(&Empty),
323            #[cfg(any(
324                all(
325                    any(feature = "http1", feature = "http2"),
326                    any(feature = "client", feature = "server")
327                ),
328                feature = "ffi"
329            ))]
330            _ => builder.field(&Streaming),
331        };
332
333        builder.finish()
334    }
335}
336
337#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
338impl Sender {
339    /// Check to see if this `Sender` can send more data.
340    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
341        // Check if the receiver end has tried polling for the body yet
342        ready!(self.poll_want(cx)?);
343        self.data_tx
344            .poll_ready(cx)
345            .map_err(|_| crate::Error::new_closed())
346    }
347
348    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
349        match self.want_rx.load(cx) {
350            WANT_READY => Poll::Ready(Ok(())),
351            WANT_PENDING => Poll::Pending,
352            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
353            unexpected => unreachable!("want_rx value: {}", unexpected),
354        }
355    }
356
357    #[cfg(test)]
358    async fn ready(&mut self) -> crate::Result<()> {
359        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
360    }
361
362    /// Send data on data channel when it is ready.
363    #[cfg(test)]
364    #[allow(unused)]
365    pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
366        self.ready().await?;
367        self.data_tx
368            .try_send(Ok(chunk))
369            .map_err(|_| crate::Error::new_closed())
370    }
371
372    /// Send trailers on trailers channel.
373    #[allow(unused)]
374    pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
375        let tx = match self.trailers_tx.take() {
376            Some(tx) => tx,
377            None => return Err(crate::Error::new_closed()),
378        };
379        tx.send(trailers).map_err(|_| crate::Error::new_closed())
380    }
381
382    /// Try to send data on this channel.
383    ///
384    /// # Errors
385    ///
386    /// Returns `Err(Bytes)` if the channel could not (currently) accept
387    /// another `Bytes`.
388    ///
389    /// # Note
390    ///
391    /// This is mostly useful for when trying to send from some other thread
392    /// that doesn't have an async context. If in an async context, prefer
393    /// `send_data()` instead.
394    #[cfg(feature = "http1")]
395    pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
396        self.data_tx
397            .try_send(Ok(chunk))
398            .map_err(|err| err.into_inner().expect("just sent Ok"))
399    }
400
401    #[cfg(test)]
402    pub(crate) fn abort(mut self) {
403        self.send_error(crate::Error::new_body_write_aborted());
404    }
405
406    pub(crate) fn send_error(&mut self, err: crate::Error) {
407        let _ = self
408            .data_tx
409            // clone so the send works even if buffer is full
410            .clone()
411            .try_send(Err(err));
412    }
413}
414
415#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
416impl fmt::Debug for Sender {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        #[derive(Debug)]
419        struct Open;
420        #[derive(Debug)]
421        struct Closed;
422
423        let mut builder = f.debug_tuple("Sender");
424        match self.want_rx.peek() {
425            watch::CLOSED => builder.field(&Closed),
426            _ => builder.field(&Open),
427        };
428
429        builder.finish()
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use std::mem;
436    use std::task::Poll;
437
438    use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
439    use http_body_util::BodyExt;
440
441    #[test]
442    fn test_size_of() {
443        // These are mostly to help catch *accidentally* increasing
444        // the size by too much.
445
446        let body_size = mem::size_of::<Incoming>();
447        let body_expected_size = mem::size_of::<u64>() * 5;
448        assert!(
449            body_size <= body_expected_size,
450            "Body size = {} <= {}",
451            body_size,
452            body_expected_size,
453        );
454
455        //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");
456
457        assert_eq!(
458            mem::size_of::<Sender>(),
459            mem::size_of::<usize>() * 5,
460            "Sender"
461        );
462
463        assert_eq!(
464            mem::size_of::<Sender>(),
465            mem::size_of::<Option<Sender>>(),
466            "Option<Sender>"
467        );
468    }
469
470    #[test]
471    fn size_hint() {
472        fn eq(body: Incoming, b: SizeHint, note: &str) {
473            let a = body.size_hint();
474            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
475            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
476        }
477
478        eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
479
480        eq(Incoming::channel().1, SizeHint::new(), "channel");
481
482        eq(
483            Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
484            SizeHint::with_exact(4),
485            "channel with length",
486        );
487    }
488
489    #[cfg(not(miri))]
490    #[tokio::test]
491    async fn channel_abort() {
492        let (tx, mut rx) = Incoming::channel();
493
494        tx.abort();
495
496        let err = rx.frame().await.unwrap().unwrap_err();
497        assert!(err.is_body_write_aborted(), "{:?}", err);
498    }
499
500    #[cfg(all(not(miri), feature = "http1"))]
501    #[tokio::test]
502    async fn channel_abort_when_buffer_is_full() {
503        let (mut tx, mut rx) = Incoming::channel();
504
505        tx.try_send_data("chunk 1".into()).expect("send 1");
506        // buffer is full, but can still send abort
507        tx.abort();
508
509        let chunk1 = rx
510            .frame()
511            .await
512            .expect("item 1")
513            .expect("chunk 1")
514            .into_data()
515            .unwrap();
516        assert_eq!(chunk1, "chunk 1");
517
518        let err = rx.frame().await.unwrap().unwrap_err();
519        assert!(err.is_body_write_aborted(), "{:?}", err);
520    }
521
522    #[cfg(feature = "http1")]
523    #[test]
524    fn channel_buffers_one() {
525        let (mut tx, _rx) = Incoming::channel();
526
527        tx.try_send_data("chunk 1".into()).expect("send 1");
528
529        // buffer is now full
530        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
531        assert_eq!(chunk2, "chunk 2");
532    }
533
534    #[cfg(not(miri))]
535    #[tokio::test]
536    async fn channel_empty() {
537        let (_, mut rx) = Incoming::channel();
538
539        assert!(rx.frame().await.is_none());
540    }
541
542    #[test]
543    fn channel_ready() {
544        let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
545
546        let mut tx_ready = tokio_test::task::spawn(tx.ready());
547
548        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
549    }
550
551    #[test]
552    #[cfg(not(miri))] // TODO issue #3015
553    fn channel_wanter() {
554        let (mut tx, mut rx) =
555            Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
556
557        let mut tx_ready = tokio_test::task::spawn(tx.ready());
558        let mut rx_data = tokio_test::task::spawn(rx.frame());
559
560        assert!(
561            tx_ready.poll().is_pending(),
562            "tx isn't ready before rx has been polled"
563        );
564
565        assert!(rx_data.poll().is_pending(), "poll rx.data");
566        assert!(tx_ready.is_woken(), "rx poll wakes tx");
567
568        assert!(
569            tx_ready.poll().is_ready(),
570            "tx is ready after rx has been polled"
571        );
572    }
573
574    #[test]
575    #[cfg(not(miri))] // TODO issue #3015
576    fn channel_notices_closure() {
577        let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
578
579        let mut tx_ready = tokio_test::task::spawn(tx.ready());
580
581        assert!(
582            tx_ready.poll().is_pending(),
583            "tx isn't ready before rx has been polled"
584        );
585
586        drop(rx);
587        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
588
589        match tx_ready.poll() {
590            Poll::Ready(Err(ref e)) if e.is_closed() => (),
591            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
592        }
593    }
594}