hyper 0.11.27

A modern HTTP library.
Documentation
use std::fmt;

use bytes::Bytes;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use futures::sync::{mpsc, oneshot};
#[cfg(feature = "tokio-proto")]
use tokio_proto;
use std::borrow::Cow;

use common::Never;
use super::Chunk;

#[cfg(feature = "tokio-proto")]
pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

/// A `Stream` for `Chunk`s used in requests and responses.
#[must_use = "streams do nothing unless polled"]
pub struct Body {
    kind: Kind,
    /// Allow the client to pass a future to delay the `Body` from returning
    /// EOF. This allows the `Client` to try to put the idle connection
    /// back into the pool before the body is "finished".
    ///
    /// The reason for this is so that creating a new request after finishing
    /// streaming the body of a response could sometimes result in creating
    /// a brand new connection, since the pool didn't know about the idle
    /// connection yet.
    delayed_eof: Option<DelayEof>,
}

#[derive(Debug)]
enum Kind {
    #[cfg(feature = "tokio-proto")]
    Tokio(TokioBody),
    Chan {
        close_tx: oneshot::Sender<bool>,
        rx: mpsc::Receiver<Result<Chunk, ::Error>>,
    },
    Once(Option<Chunk>),
    Empty,
}

type DelayEofUntil = oneshot::Receiver<Never>;

enum DelayEof {
    /// Initial state, stream hasn't seen EOF yet.
    NotEof(DelayEofUntil),
    /// Transitions to this state once we've seen `poll` try to
    /// return EOF (`None`). This future is then polled, and
    /// when it completes, the Body finally returns EOF (`None`).
    Eof(DelayEofUntil),
}

//pub(crate)
#[derive(Debug)]
pub struct ChunkSender {
    close_rx: oneshot::Receiver<bool>,
    close_rx_check: bool,
    tx: BodySender,
}

impl Body {
    /// Return an empty body stream
    #[inline]
    pub fn empty() -> Body {
        Body::new(Kind::Empty)
    }

    /// Return a body stream with an associated sender half
    #[inline]
    pub fn pair() -> (mpsc::Sender<Result<Chunk, ::Error>>, Body) {
        let (tx, rx) = channel();
        (tx.tx, rx)
    }

    /// Returns if this body was constructed via `Body::empty()`.
    ///
    /// # Note
    ///
    /// This does **not** detect if the body stream may be at the end, or
    /// if the stream will not yield any chunks, in all cases. For instance,
    /// a streaming body using `chunked` encoding is not able to tell if
    /// there are more chunks immediately.
    #[inline]
    pub fn is_empty(&self) -> bool {
        match self.kind {
            Kind::Empty => true,
            _ => false,
        }
    }

    fn new(kind: Kind) -> Body {
        Body {
            kind: kind,
            delayed_eof: None,
        }
    }

    pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
        self.delayed_eof = Some(DelayEof::NotEof(fut));
    }

    fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
        match self.delayed_eof.take() {
            Some(DelayEof::NotEof(mut delay)) => {
                match self.poll_inner() {
                    ok @ Ok(Async::Ready(Some(..))) |
                    ok @ Ok(Async::NotReady) => {
                        self.delayed_eof = Some(DelayEof::NotEof(delay));
                        ok
                    },
                    Ok(Async::Ready(None)) => match delay.poll() {
                        Ok(Async::Ready(never)) => match never {},
                        Ok(Async::NotReady) => {
                            self.delayed_eof = Some(DelayEof::Eof(delay));
                            Ok(Async::NotReady)
                        },
                        Err(_done) => {
                            Ok(Async::Ready(None))
                        },
                    },
                    Err(e) => Err(e),
                }
            },
            Some(DelayEof::Eof(mut delay)) => {
                match delay.poll() {
                    Ok(Async::Ready(never)) => match never {},
                    Ok(Async::NotReady) => {
                        self.delayed_eof = Some(DelayEof::Eof(delay));
                        Ok(Async::NotReady)
                    },
                    Err(_done) => {
                        Ok(Async::Ready(None))
                    },
                }
            },
            None => self.poll_inner(),
        }
    }

    fn poll_inner(&mut self) -> Poll<Option<Chunk>, ::Error> {
        match self.kind {
            #[cfg(feature = "tokio-proto")]
            Kind::Tokio(ref mut rx) => rx.poll(),
            Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") {
                Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))),
                Async::Ready(Some(Err(err))) => Err(err),
                Async::Ready(None) => Ok(Async::Ready(None)),
                Async::NotReady => Ok(Async::NotReady),
            },
            Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
            Kind::Empty => Ok(Async::Ready(None)),
        }
    }
}

impl Default for Body {
    #[inline]
    fn default() -> Body {
        Body::empty()
    }
}

impl Stream for Body {
    type Item = Chunk;
    type Error = ::Error;

    #[inline]
    fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> {
        self.poll_eof()
    }
}

impl fmt::Debug for Body {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_tuple("Body")
            .field(&self.kind)
            .finish()
    }
}

//pub(crate)
pub fn channel() -> (ChunkSender, Body) {
    let (tx, rx) = mpsc::channel(0);
    let (close_tx, close_rx) = oneshot::channel();

    let tx = ChunkSender {
        close_rx: close_rx,
        close_rx_check: true,
        tx: tx,
    };
    let rx = Body::new(Kind::Chan {
        close_tx: close_tx,
        rx: rx,
    });

    (tx, rx)
}

impl ChunkSender {
    pub fn poll_ready(&mut self) -> Poll<(), ()> {
        if self.close_rx_check {
            match self.close_rx.poll() {
                Ok(Async::Ready(true)) | Err(_) => return Err(()),
                Ok(Async::Ready(false)) => {
                    // needed to allow converting into a plain mpsc::Receiver
                    // if it has been, the tx will send false to disable this check
                    self.close_rx_check = false;
                }
                Ok(Async::NotReady) => (),
            }
        }

        self.tx.poll_ready().map_err(|_| ())
    }

    pub fn start_send(&mut self, msg: Result<Chunk, ::Error>) -> StartSend<(), ()> {
        match self.tx.start_send(msg) {
            Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
            Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())),
            Err(_) => Err(()),
        }
    }
}

feat_server_proto! {
    impl From<Body> for tokio_proto::streaming::Body<Chunk, ::Error> {
        fn from(b: Body) -> tokio_proto::streaming::Body<Chunk, ::Error> {
            match b.kind {
                Kind::Tokio(b) => b,
                Kind::Chan { close_tx, rx } => {
                    // disable knowing if the Rx gets dropped, since we cannot
                    // pass this tx along.
                    let _ = close_tx.send(false);
                    rx.into()
                },
                Kind::Once(Some(chunk)) => TokioBody::from(chunk),
                Kind::Once(None) |
                Kind::Empty => TokioBody::empty(),
            }
        }
    }

    impl From<tokio_proto::streaming::Body<Chunk, ::Error>> for Body {
        fn from(tokio_body: tokio_proto::streaming::Body<Chunk, ::Error>) -> Body {
            Body::new(Kind::Tokio(tokio_body))
        }
    }
}

impl From<mpsc::Receiver<Result<Chunk, ::Error>>> for Body {
    #[inline]
    fn from(src: mpsc::Receiver<Result<Chunk, ::Error>>) -> Body {
        let (tx, _) = oneshot::channel();
        Body::new(Kind::Chan {
            close_tx: tx,
            rx: src,
        })
    }
}

impl From<Chunk> for Body {
    #[inline]
    fn from (chunk: Chunk) -> Body {
        Body::new(Kind::Once(Some(chunk)))
    }
}

impl From<Bytes> for Body {
    #[inline]
    fn from (bytes: Bytes) -> Body {
        Body::from(Chunk::from(bytes))
    }
}

impl From<Vec<u8>> for Body {
    #[inline]
    fn from (vec: Vec<u8>) -> Body {
        Body::from(Chunk::from(vec))
    }
}

impl From<&'static [u8]> for Body {
    #[inline]
    fn from (slice: &'static [u8]) -> Body {
        Body::from(Chunk::from(slice))
    }
}

impl From<Cow<'static, [u8]>> for Body {
    #[inline]
    fn from (cow: Cow<'static, [u8]>) -> Body {
        match cow {
            Cow::Borrowed(b) => Body::from(b),
            Cow::Owned(o) => Body::from(o)
        }
    }
}

impl From<String> for Body {
    #[inline]
    fn from (s: String) -> Body {
        Body::from(Chunk::from(s.into_bytes()))
    }
}

impl From<&'static str> for Body {
    #[inline]
    fn from(slice: &'static str) -> Body {
        Body::from(Chunk::from(slice.as_bytes()))
    }
}

impl From<Cow<'static, str>> for Body {
    #[inline]
    fn from(cow: Cow<'static, str>) -> Body {
        match cow {
            Cow::Borrowed(b) => Body::from(b),
            Cow::Owned(o) => Body::from(o)
        }
    }
}

impl From<Option<Body>> for Body {
    #[inline]
    fn from (body: Option<Body>) -> Body {
        body.unwrap_or_default()
    }
}

fn _assert_send_sync() {
    fn _assert_send<T: Send>() {}
    fn _assert_sync<T: Sync>() {}

    _assert_send::<Body>();
    _assert_send::<Chunk>();
    _assert_sync::<Chunk>();
}

#[test]
fn test_body_stream_concat() {
    use futures::{Sink, Stream, Future};
    let (tx, body) = Body::pair();

    ::std::thread::spawn(move || {
        let tx = tx.send(Ok("hello ".into())).wait().unwrap();
        tx.send(Ok("world".into())).wait().unwrap();
    });

    let total = body.concat2().wait().unwrap();
    assert_eq!(total.as_ref(), b"hello world");

}