1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! Types and traits giving an interface between low-level http server implementations
//! and services that use them. The interface is based on the `std::futures` API.

#![cfg_attr(feature = "nightly", deny(missing_docs))]
#![cfg_attr(test, deny(warnings))]
#![feature(futures_api, async_await, await_macro, arbitrary_self_types)]

use bytes::Bytes;
use futures::{
    future,
    prelude::*,
    stream::{self, StreamObj},
    task::Waker,
    Poll,
};

use std::marker::Unpin;
use std::pin::Pin;

/// The raw body of an http request or response.
///
/// A body is a stream of `Bytes` values, which are shared handles to byte buffers.
/// Both `Body` and `Bytes` values can be easily created from standard owned byte buffer types
/// like `Vec<u8>` or `String`, using the `From` trait.
pub struct Body {
    stream: StreamObj<'static, Result<Bytes, std::io::Error>>,
}

impl Body {
    /// Create an empty body.
    pub fn empty() -> Self {
        Body::from_stream(stream::empty())
    }

    /// Create a body from a stream of `Bytes`
    pub fn from_stream<S>(s: S) -> Self
    where
        S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
    {
        Self {
            stream: StreamObj::new(Box::new(s)),
        }
    }

    /// Reads the stream into a new `Vec`.
    pub async fn into_vec(mut self) -> std::io::Result<Vec<u8>> {
        let mut bytes = Vec::new();
        while let Some(chunk) = await!(self.next()) {
            bytes.extend(chunk?);
        }
        Ok(bytes)
    }
}

impl<T: Into<Bytes> + Send> From<T> for Body {
    fn from(x: T) -> Self {
        Self::from_stream(stream::once(future::ok(x.into())))
    }
}

impl Unpin for Body {}

impl Stream for Body {
    type Item = Result<Bytes, std::io::Error>;
    fn poll_next(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.stream).poll_next(waker)
    }
}

/// An HTTP request with a streaming body.
pub type Request = http::Request<Body>;

/// An HTTP response with a streaming body.
pub type Response = http::Response<Body>;

/// An async HTTP service
///
/// An instance represents a service as a whole. The associated `Conn` type
/// represents a particular connection, and may carry connection-specific state.
pub trait HttpService: Send + Sync + 'static {
    /// An individual connection.
    ///
    /// This associated type is used to establish and hold any per-connection state
    /// needed by the service.
    type Connection: Send + 'static;

    /// A future for setting up an individual connection.
    ///
    /// This method is called each time the server receives a new connection request,
    /// but before actually exchanging any data with the client.
    ///
    /// Returning an error will result in the server immediately dropping
    /// the connection.
    type ConnectionFuture: Send + 'static + TryFuture<Ok = Self::Connection>;

    /// Initiate a new connection.
    ///
    /// This method is given access to the global service (`&self`), which may provide
    /// handles to connection pools, thread pools, or other global data.
    fn connect(&self) -> Self::ConnectionFuture;

    /// The async computation for producing the response.
    ///
    /// Returning an error will result in the server immediately dropping
    /// the connection. It is usually preferable to instead return an HTTP response
    /// with an error status code.
    type Fut: Send + 'static + TryFuture<Ok = Response>;

    /// Begin handling a single request.
    ///
    /// The handler is given shared access to the service itself, and mutable access
    /// to the state for the connection where the request is taking place.
    fn respond(&self, conn: &mut Self::Connection, req: Request) -> Self::Fut;
}

impl<F, Fut> HttpService for F
where
    F: Send + Sync + 'static + Fn(Request) -> Fut,
    Fut: Send + 'static + TryFuture<Ok = Response>,
    Fut::Error: Send,
{
    type Connection = ();
    type ConnectionFuture = future::Ready<Result<(), Fut::Error>>;
    fn connect(&self) -> Self::ConnectionFuture {
        future::ok(())
    }

    type Fut = Fut;
    fn respond(&self, _: &mut (), req: Request) -> Self::Fut {
        (self)(req)
    }
}