1use net::{self, ConnectionStream};
2use util::BufStream;
3use util::http::{HttpService, NewHttpService};
4
5use futures::Poll;
6use http;
7use http::status::StatusCode;
8use hyper;
9use hyper::body::{Body, Chunk, Payload};
10use hyper::server::conn::Http;
11use hyper::service::Service as HyperService;
12use util::buf_stream::size_hint::{Builder, SizeHint};
13
14use tokio;
15use tokio::net::TcpListener;
16use tokio::prelude::*;
17
18use std::io;
19use std::net::SocketAddr;
20use std::sync::Arc;
21
22struct Lift<T: HttpService> {
23 inner: T,
24}
25
26struct LiftBody<T: HttpService> {
27 body: T::ResponseBody,
28}
29
30#[derive(Debug)]
31pub struct LiftReqBody {
32 body: Body,
33}
34
35impl<T> Lift<T>
36where
37 T: HttpService<RequestBody = LiftReqBody>,
38{
39 fn new(inner: T) -> Self {
40 Lift { inner }
41 }
42}
43
44impl<T> Payload for LiftBody<T>
45where
46 T: HttpService + 'static,
47 <T::ResponseBody as BufStream>::Item: Send,
48 T::ResponseBody: Send,
49{
50 type Data = <T::ResponseBody as BufStream>::Item;
51 type Error = ::Error;
52
53 fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
54 self.body.poll()
55 .map_err(|_| unimplemented!())
56 }
57}
58
59impl BufStream for LiftReqBody {
60 type Item = Chunk;
61 type Error = ::Error;
62
63 fn poll(&mut self) -> Poll<Option<Self::Item>, ::Error> {
64 Stream::poll(&mut self.body).map_err(|_| ::Error::from(StatusCode::INTERNAL_SERVER_ERROR))
65 }
66}
67
68impl<T> HyperService for Lift<T>
69where
70 T: HttpService<RequestBody = LiftReqBody> + 'static,
71 <T::ResponseBody as BufStream>::Item: Send,
72 T::ResponseBody: Send,
73 T::Future: Send,
74{
75 type ReqBody = Body;
76 type ResBody = LiftBody<T>;
77 type Error = ::Error;
78 type Future = Box<Future<Item = http::Response<Self::ResBody>, Error = Self::Error> + Send>;
79
80 fn call(&mut self, request: http::Request<Self::ReqBody>) -> Self::Future {
81 let request = request.map(|body| LiftReqBody { body });
82 let response = self.inner
83 .call_http(request)
84 .map(|response| response.map(|body| LiftBody { body }))
85 .map_err(|_| unimplemented!())
86 ;
87
88 Box::new(response)
89 }
90}
91
92pub fn run<T>(addr: &SocketAddr, new_service: T) -> io::Result<()>
94where
95 T: NewHttpService<RequestBody = LiftReqBody> + Send + 'static,
96 T::Future: Send,
97 <T::ResponseBody as BufStream>::Item: Send,
98 T::ResponseBody: Send,
99 T::Service: Send,
100 <T::Service as HttpService>::Future: Send,
101{
102 let listener = TcpListener::bind(addr)?;
103
104 tokio::run(serve(listener.incoming(), new_service));
105
106 Ok(())
107}
108
109pub fn serve<S, T>(incoming: S, new_service: T) -> impl Future<Item = (), Error = ()>
113where
114 S: ConnectionStream,
115 S::Item: Send + 'static,
116 T: NewHttpService<RequestBody = LiftReqBody> + Send + 'static,
117 T::Future: Send,
118 <T::ResponseBody as BufStream>::Item: Send,
119 T::ResponseBody: Send,
120 T::Service: Send,
121 <T::Service as HttpService>::Future: Send,
122{
123 let http = Arc::new(Http::new());
124 net::Lift(incoming)
125 .map_err(|e| println!("failed to accept socket; err = {:?}", e))
126 .for_each(move |socket| {
127 let h = http.clone();
128
129 tokio::spawn({
130 new_service
131 .new_http_service()
132 .map_err(|_| unimplemented!())
133 .and_then(move |service| {
134 let service = Lift::new(service);
135
136 h.serve_connection(socket, service)
137 .map(|_| ())
138 .map_err(|e| {
139 println!("failed to serve connection; err={:?}", e);
140 })
141 })
142 })
143 })
144}
145
146impl BufStream for Body {
147 type Item = Chunk;
148 type Error = hyper::Error;
149
150 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
151 Stream::poll(self)
152 }
153
154 fn size_hint(&self) -> SizeHint {
155 let mut builder = Builder::new();
156 if let Some(length) = self.content_length() {
157 if length < usize::max_value() as u64 {
158 let length = length as usize;
159 builder.lower(length).upper(length);
160 } else {
161 builder.lower(usize::max_value());
162 }
163 }
164 builder.build()
165 }
166}