actori_http/
payload.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use futures_core::Stream;
6use h2::RecvStream;
7
8use crate::error::PayloadError;
9
10/// Type represent boxed payload
11pub type PayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>;
12
13/// Type represent streaming payload
14pub enum Payload<S = PayloadStream> {
15    None,
16    H1(crate::h1::Payload),
17    H2(crate::h2::Payload),
18    Stream(S),
19}
20
21impl<S> From<crate::h1::Payload> for Payload<S> {
22    fn from(v: crate::h1::Payload) -> Self {
23        Payload::H1(v)
24    }
25}
26
27impl<S> From<crate::h2::Payload> for Payload<S> {
28    fn from(v: crate::h2::Payload) -> Self {
29        Payload::H2(v)
30    }
31}
32
33impl<S> From<RecvStream> for Payload<S> {
34    fn from(v: RecvStream) -> Self {
35        Payload::H2(crate::h2::Payload::new(v))
36    }
37}
38
39impl From<PayloadStream> for Payload {
40    fn from(pl: PayloadStream) -> Self {
41        Payload::Stream(pl)
42    }
43}
44
45impl<S> Payload<S> {
46    /// Takes current payload and replaces it with `None` value
47    pub fn take(&mut self) -> Payload<S> {
48        std::mem::replace(self, Payload::None)
49    }
50}
51
52impl<S> Stream for Payload<S>
53where
54    S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
55{
56    type Item = Result<Bytes, PayloadError>;
57
58    #[inline]
59    fn poll_next(
60        self: Pin<&mut Self>,
61        cx: &mut Context<'_>,
62    ) -> Poll<Option<Self::Item>> {
63        match self.get_mut() {
64            Payload::None => Poll::Ready(None),
65            Payload::H1(ref mut pl) => pl.readany(cx),
66            Payload::H2(ref mut pl) => Pin::new(pl).poll_next(cx),
67            Payload::Stream(ref mut pl) => Pin::new(pl).poll_next(cx),
68        }
69    }
70}