tower_hyper/server/
mod.rs

1//! The server porition of tower hyper
2
3use crate::body::{Body, LiftBody};
4use futures::{try_ready, Future, Poll};
5use http_body::Body as HttpBody;
6use hyper::service::Service as HyperService;
7use hyper::{Request, Response};
8use std::fmt;
9use std::marker::PhantomData;
10use tokio_io::{AsyncRead, AsyncWrite};
11use tower_http_util::service::HttpService;
12use tower_service::Service;
13use tower_util::MakeService;
14
15pub use hyper::server::conn::Http;
16
17/// A stream mapping incoming IOs to new services.
18pub type Serve<E> = Box<Future<Item = (), Error = Error<E>> + Send + 'static>;
19
20/// Server implemenation for hyper
21#[derive(Debug)]
22pub struct Server<S, B> {
23    maker: S,
24    _pd: PhantomData<B>,
25}
26
27/// Error's produced by a `Connection`.
28#[derive(Debug)]
29pub enum Error<E> {
30    /// Error's originating from `hyper`.
31    Protocol(hyper::Error),
32    /// Error's produced from creating the inner service.
33    MakeService(E),
34}
35
36#[derive(Debug)]
37struct LiftService<T, B> {
38    inner: T,
39    _pd: PhantomData<B>,
40}
41
42#[derive(Debug)]
43struct LiftServiceFuture<F, B> {
44    inner: F,
45    _pd: PhantomData<B>,
46}
47
48impl<S, B> Server<S, B>
49where
50    S: MakeService<(), Request<Body>, Response = Response<B>> + Send + 'static,
51    S::MakeError: Into<crate::Error>,
52    S::Error: Into<crate::Error>,
53    S::Future: Send,
54    S::Service: Service<Request<Body>> + Send,
55    <S::Service as Service<Request<Body>>>::Future: Send + 'static,
56    B: HttpBody + Send + 'static,
57    B::Data: Send + 'static,
58    B::Error: Into<crate::Error> + 'static,
59{
60    /// Create a new server from a `MakeService`
61    pub fn new(maker: S) -> Self {
62        Server {
63            maker,
64            _pd: PhantomData,
65        }
66    }
67
68    /// Serve the `io` stream via default hyper http settings
69    pub fn serve<I>(&mut self, io: I) -> Serve<S::MakeError>
70    where
71        I: AsyncRead + AsyncWrite + Send + 'static,
72    {
73        let http = Http::new();
74        self.serve_with(io, http)
75    }
76
77    /// Serve the `io` stream via the provided hyper http settings
78    pub fn serve_with<I>(&mut self, io: I, http: Http) -> Serve<S::MakeError>
79    where
80        I: AsyncRead + AsyncWrite + Send + 'static,
81    {
82        let fut = self
83            .maker
84            .make_service(())
85            .map_err(Error::MakeService)
86            .and_then(move |svc| {
87                let svc = LiftService::new(svc);
88                http.serve_connection(io, svc).map_err(Error::Protocol)
89            });
90
91        Box::new(fut)
92    }
93}
94
95impl<T, B> LiftService<T, B> {
96    pub(crate) fn new(inner: T) -> Self {
97        LiftService {
98            inner,
99            _pd: PhantomData,
100        }
101    }
102}
103
104impl<T, B> HyperService for LiftService<T, B>
105where
106    B: HttpBody + Send + 'static,
107    B::Data: Send,
108    B::Error: Into<crate::Error>,
109    T: HttpService<Body, ResponseBody = B>,
110    T::Error: Into<crate::Error>,
111{
112    type ReqBody = hyper::Body;
113    type ResBody = LiftBody<B>;
114    type Error = crate::Error;
115    type Future = LiftServiceFuture<T::Future, B>;
116
117    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
118        self.inner.poll_ready().map_err(Into::into)
119    }
120
121    fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
122        let fut = self.inner.call(request.map(Body::from));
123
124        LiftServiceFuture {
125            inner: fut,
126            _pd: PhantomData,
127        }
128    }
129}
130
131impl<F, B> Future for LiftServiceFuture<F, B>
132where
133    F: Future<Item = Response<B>>,
134    F::Error: Into<crate::Error>,
135    B: HttpBody + Send,
136    B::Data: Send,
137    B::Error: Into<crate::Error>,
138{
139    type Item = Response<LiftBody<B>>;
140    type Error = crate::Error;
141
142    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
143        let response = try_ready!(self.inner.poll().map_err(Into::into));
144        Ok(response.map(LiftBody::from).into())
145    }
146}
147
148impl<E: fmt::Debug> fmt::Display for Error<E> {
149    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
150        match &self {
151            Error::Protocol(why) => f.debug_tuple("Protocol").field(why).finish(),
152            Error::MakeService(why) => f.debug_tuple("MakeService").field(why).finish(),
153        }
154    }
155}
156
157impl<E: fmt::Debug> ::std::error::Error for Error<E> {}