1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{Async, Future, Poll};
use hyper::{rt, Server as HyperServer};
use hyper::service::{service_fn};
use ::never::Never;
use ::reject::Reject;
use ::reply::{ReplySealed, Reply};
use ::Request;
pub fn serve<S>(service: S) -> Server<S>
where
S: IntoWarpService + 'static,
{
Server {
pipeline: false,
service,
}
}
#[derive(Debug)]
pub struct Server<S> {
pipeline: bool,
service: S,
}
impl<S> Server<S>
where
S: IntoWarpService + 'static,
<<S::Service as WarpService>::Reply as Future>::Item: Reply + Send,
<<S::Service as WarpService>::Reply as Future>::Error: Reject + Send,
{
pub fn run(self, addr: impl Into<SocketAddr> + 'static) {
let (addr, fut) = self.bind_ephemeral(addr);
info!("warp drive engaged: listening on {}", addr);
rt::run(fut);
}
pub fn bind(self, addr: impl Into<SocketAddr> + 'static) -> impl Future<Item=(), Error=()> + 'static {
let (_, fut) = self.bind_ephemeral(addr);
fut
}
pub fn bind_ephemeral(self, addr: impl Into<SocketAddr> + 'static) -> (SocketAddr, impl Future<Item=(), Error=()> + 'static) {
let inner = Arc::new(self.service.into_warp_service());
let service = move || {
let inner = inner.clone();
service_fn(move |req| {
ReplyFuture {
inner: inner.call(req)
}
})
};
let srv = HyperServer::bind(&addr.into())
.http1_pipeline_flush(self.pipeline)
.serve(service);
let addr = srv.local_addr();
(addr, srv.map_err(|e| error!("server error: {}", e)))
}
#[doc(hidden)]
pub fn unstable_pipeline(mut self) -> Self {
self.pipeline = true;
self
}
}
pub trait IntoWarpService {
type Service: WarpService + Send + Sync + 'static;
fn into_warp_service(self) -> Self::Service;
}
pub trait WarpService {
type Reply: Future + Send;
fn call(&self, req: Request) -> Self::Reply;
}
#[derive(Debug)]
struct ReplyFuture<F> {
inner: F,
}
impl<F> Future for ReplyFuture<F>
where
F: Future,
F::Item: Reply,
F::Error: Reject,
{
type Item = ::reply::Response;
type Error = Never;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(ok)) => Ok(Async::Ready(ok.into_response())),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Ok(Async::Ready(err.into_response())),
}
}
}