1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use super::adaptor::{ConduitRequest, RequestInfo};
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use futures::prelude::*;
use hyper::{service, Body, Error, Request, Response, StatusCode};
use tracing::error;
#[derive(Debug)]
pub struct Service;
impl Service {
pub fn from_conduit<H: conduit::Handler>(
handler: Arc<H>,
remote_addr: SocketAddr,
) -> Result<
impl tower_service::Service<
Request<Body>,
Response = Response<Body>,
Error = Error,
Future = impl Future<Output = Result<Response<Body>, Error>> + Send + 'static,
>,
Error,
> {
Ok(service::service_fn(move |request: Request<Body>| {
blocking_handler(handler.clone(), request, remote_addr)
}))
}
}
pub(crate) async fn blocking_handler<H: conduit::Handler>(
handler: Arc<H>,
request: Request<Body>,
remote_addr: SocketAddr,
) -> Result<Response<Body>, Error> {
let (parts, body) = request.into_parts();
let full_body = body.try_concat().await?;
let mut request_info = RequestInfo::new(parts, full_body);
let future = future::poll_fn(move |_| {
tokio_executor::threadpool::blocking(|| {
let mut request = ConduitRequest::new(&mut request_info, remote_addr);
handler
.call(&mut request)
.map(good_response)
.unwrap_or_else(|e| error_response(&e.to_string()))
})
.map_err(|_| panic!("The threadpool shut down"))
});
let (future, handle) = future.remote_handle();
tokio_executor::spawn(future);
handle.await
}
fn good_response(mut response: conduit::Response) -> Response<Body> {
let mut body = Vec::new();
if response.body.write_body(&mut body).is_err() {
return error_response("Error writing body");
}
let mut builder = Response::builder();
let status = match StatusCode::from_u16(response.status.0 as u16) {
Ok(s) => s,
Err(e) => return error_response(&e.to_string()),
};
builder.status(status);
for (key, values) in response.headers {
for value in values {
builder.header(key.as_str(), value.as_str());
}
}
builder
.body(body.into())
.unwrap_or_else(|e| error_response(&e.to_string()))
}
fn error_response(message: &str) -> Response<Body> {
error!("Internal Server Error: {}", message);
let body = Body::from("Internal Server Error");
Response::builder()
.status(500)
.body(body)
.expect("Unexpected invalid header")
}