tower_web/
run.rs

1use net::{self, ConnectionStream};
2use util::BufStream;
3use util::http::{HttpService, NewHttpService};
4
5use futures::Poll;
6use http;
7use http::status::StatusCode;
8use hyper;
9use hyper::body::{Body, Chunk, Payload};
10use hyper::server::conn::Http;
11use hyper::service::Service as HyperService;
12use util::buf_stream::size_hint::{Builder, SizeHint};
13
14use tokio;
15use tokio::net::TcpListener;
16use tokio::prelude::*;
17
18use std::io;
19use std::net::SocketAddr;
20use std::sync::Arc;
21
22struct Lift<T: HttpService> {
23    inner: T,
24}
25
26struct LiftBody<T: HttpService> {
27    body: T::ResponseBody,
28}
29
30#[derive(Debug)]
31pub struct LiftReqBody {
32    body: Body,
33}
34
35impl<T> Lift<T>
36where
37    T: HttpService<RequestBody = LiftReqBody>,
38{
39    fn new(inner: T) -> Self {
40        Lift { inner }
41    }
42}
43
44impl<T> Payload for LiftBody<T>
45where
46    T: HttpService + 'static,
47    <T::ResponseBody as BufStream>::Item: Send,
48    T::ResponseBody: Send,
49{
50    type Data = <T::ResponseBody as BufStream>::Item;
51    type Error = ::Error;
52
53    fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
54        self.body.poll()
55            .map_err(|_| unimplemented!())
56    }
57}
58
59impl BufStream for LiftReqBody {
60    type Item = Chunk;
61    type Error = ::Error;
62
63    fn poll(&mut self) -> Poll<Option<Self::Item>, ::Error> {
64        Stream::poll(&mut self.body).map_err(|_| ::Error::from(StatusCode::INTERNAL_SERVER_ERROR))
65    }
66}
67
68impl<T> HyperService for Lift<T>
69where
70    T: HttpService<RequestBody = LiftReqBody> + 'static,
71    <T::ResponseBody as BufStream>::Item: Send,
72    T::ResponseBody: Send,
73    T::Future: Send,
74{
75    type ReqBody = Body;
76    type ResBody = LiftBody<T>;
77    type Error = ::Error;
78    type Future = Box<Future<Item = http::Response<Self::ResBody>, Error = Self::Error> + Send>;
79
80    fn call(&mut self, request: http::Request<Self::ReqBody>) -> Self::Future {
81        let request = request.map(|body| LiftReqBody { body });
82        let response = self.inner
83            .call_http(request)
84            .map(|response| response.map(|body| LiftBody { body }))
85            .map_err(|_| unimplemented!())
86            ;
87
88        Box::new(response)
89    }
90}
91
92/// Run a service
93pub fn run<T>(addr: &SocketAddr, new_service: T) -> io::Result<()>
94where
95    T: NewHttpService<RequestBody = LiftReqBody> + Send + 'static,
96    T::Future: Send,
97    <T::ResponseBody as BufStream>::Item: Send,
98    T::ResponseBody: Send,
99    T::Service: Send,
100    <T::Service as HttpService>::Future: Send,
101{
102    let listener = TcpListener::bind(addr)?;
103
104    tokio::run(serve(listener.incoming(), new_service));
105
106    Ok(())
107}
108
109/// Returns a future that must be polled to process the incoming connections.
110///
111/// A non-blocking version of `run`.
112pub fn serve<S, T>(incoming: S, new_service: T) -> impl Future<Item = (), Error = ()>
113where
114    S: ConnectionStream,
115    S::Item: Send + 'static,
116    T: NewHttpService<RequestBody = LiftReqBody> + Send + 'static,
117    T::Future: Send,
118    <T::ResponseBody as BufStream>::Item: Send,
119    T::ResponseBody: Send,
120    T::Service: Send,
121    <T::Service as HttpService>::Future: Send,
122{
123    let http = Arc::new(Http::new());
124    net::Lift(incoming)
125        .map_err(|e| println!("failed to accept socket; err = {:?}", e))
126        .for_each(move |socket| {
127            let h = http.clone();
128
129            tokio::spawn({
130                new_service
131                    .new_http_service()
132                    .map_err(|_| unimplemented!())
133                    .and_then(move |service| {
134                        let service = Lift::new(service);
135
136                        h.serve_connection(socket, service)
137                            .map(|_| ())
138                            .map_err(|e| {
139                                println!("failed to serve connection; err={:?}", e);
140                            })
141                    })
142            })
143        })
144}
145
146impl BufStream for Body {
147    type Item = Chunk;
148    type Error = hyper::Error;
149
150    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
151        Stream::poll(self)
152    }
153
154    fn size_hint(&self) -> SizeHint {
155        let mut builder = Builder::new();
156        if let Some(length) = self.content_length() {
157            if length < usize::max_value() as u64 {
158                let length = length as usize;
159                builder.lower(length).upper(length);
160            } else {
161                builder.lower(usize::max_value());
162            }
163        }
164        builder.build()
165    }
166}