1#![warn(missing_docs)]
21
22#[macro_use] extern crate log;
23extern crate unicase;
24extern crate jsonrpc_core as jsonrpc;
25extern crate jsonrpc_server_utils as server_utils;
26extern crate net2;
27
28pub extern crate hyper;
29
30mod response;
31mod handler;
32mod utils;
33#[cfg(test)]
34mod tests;
35
36use std::{fmt, io};
37use std::sync::{mpsc, Arc};
38use std::net::SocketAddr;
39
40use hyper::server;
41use jsonrpc::MetaIoHandler;
42use jsonrpc::futures::{self, Future, IntoFuture, BoxFuture, Stream};
43use jsonrpc::futures::sync::oneshot;
44use server_utils::reactor::{Remote, UninitializedRemote};
45
46pub use server_utils::hosts::{Host, DomainsValidation};
47pub use server_utils::cors::{AccessControlAllowOrigin, Origin};
48pub use server_utils::tokio_core;
49pub use handler::ServerHandler;
50pub use utils::{is_host_allowed, cors_header, CorsHeader};
51pub use response::Response;
52
53pub type ServerResult = Result<Server, Error>;
55
56#[derive(Debug)]
58pub enum Error {
59 Io(std::io::Error),
61 Other(hyper::error::Error),
63}
64
65impl From<std::io::Error> for Error {
66 fn from(err: std::io::Error) -> Self {
67 Error::Io(err)
68 }
69}
70
71impl From<hyper::error::Error> for Error {
72 fn from(err: hyper::error::Error) -> Self {
73 match err {
74 hyper::error::Error::Io(e) => Error::Io(e),
75 e => Error::Other(e)
76 }
77 }
78}
79
80impl fmt::Display for Error {
81 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
82 match *self {
83 Error::Io(ref e) => e.fmt(f),
84 Error::Other(ref e) => e.fmt(f),
85 }
86 }
87}
88
89impl ::std::error::Error for Error {
90 fn description(&self) -> &str {
91 "Starting the JSON-RPC HTTP server failed"
92 }
93
94 fn cause(&self) -> Option<&::std::error::Error> {
95 Some(match *self {
96 Error::Io(ref e) => e,
97 Error::Other(ref e) => e,
98 })
99 }
100}
101
102pub enum RequestMiddlewareAction {
104 Proceed {
106 should_continue_on_invalid_cors: bool,
109 },
110 Respond {
112 should_validate_hosts: bool,
114 handler: BoxFuture<server::Response, hyper::Error>,
116 }
117}
118
119impl From<Option<Response>> for RequestMiddlewareAction {
120 fn from(o: Option<Response>) -> Self {
121 o.map(Into::<server::Response>::into).map(futures::future::ok).into()
122 }
123}
124impl<T> From<Option<T>> for RequestMiddlewareAction where
125 T: IntoFuture<Item=server::Response, Error=hyper::Error>,
126 T::Future: Send + 'static,
127{
128 fn from(o: Option<T>) -> Self {
129 match o {
130 None => RequestMiddlewareAction::Proceed {
131 should_continue_on_invalid_cors: false,
132 },
133 Some(handler) => RequestMiddlewareAction::Respond {
134 should_validate_hosts: true,
135 handler: handler.into_future().boxed(),
136 },
137 }
138 }
139}
140
141pub trait RequestMiddleware: Send + Sync + 'static {
143 fn on_request(&self, request: &server::Request) -> RequestMiddlewareAction;
145}
146
147impl<F> RequestMiddleware for F where
148 F: Fn(&server::Request) -> RequestMiddlewareAction + Sync + Send + 'static,
149{
150 fn on_request(&self, request: &server::Request) -> RequestMiddlewareAction {
151 (*self)(request)
152 }
153}
154
155#[derive(Default)]
156struct NoopRequestMiddleware;
157impl RequestMiddleware for NoopRequestMiddleware {
158 fn on_request(&self, _request: &server::Request) -> RequestMiddlewareAction {
159 RequestMiddlewareAction::Proceed {
160 should_continue_on_invalid_cors: false,
161 }
162 }
163}
164
165pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
167 fn read_metadata(&self, _: &server::Request) -> M {
169 Default::default()
170 }
171}
172
173impl<M, F> MetaExtractor<M> for F where
174 M: jsonrpc::Metadata,
175 F: Fn(&server::Request) -> M + Sync + Send + 'static,
176{
177 fn read_metadata(&self, req: &server::Request) -> M {
178 (*self)(req)
179 }
180}
181
182#[derive(Default)]
183struct NoopExtractor;
184impl<M: jsonrpc::Metadata> MetaExtractor<M> for NoopExtractor {}
185
186pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
188 pub handler: Arc<MetaIoHandler<M, S>>,
190 pub extractor: Arc<MetaExtractor<M>>,
192}
193
194impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
195 fn clone(&self) -> Self {
196 Rpc {
197 handler: self.handler.clone(),
198 extractor: self.extractor.clone(),
199 }
200 }
201}
202
203type AllowedHosts = Option<Vec<Host>>;
204type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
205
206pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
208 handler: Arc<MetaIoHandler<M, S>>,
209 remote: UninitializedRemote,
210 meta_extractor: Arc<MetaExtractor<M>>,
211 request_middleware: Arc<RequestMiddleware>,
212 cors_domains: CorsDomains,
213 allowed_hosts: AllowedHosts,
214 keep_alive: bool,
215 threads: usize,
216}
217
218const SENDER_PROOF: &'static str = "Server initialization awaits local address.";
219
220impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
221 pub fn new<T>(handler: T) -> Self where
230 T: Into<MetaIoHandler<M, S>>
231 {
232 ServerBuilder {
233 handler: Arc::new(handler.into()),
234 remote: UninitializedRemote::Unspawned,
235 meta_extractor: Arc::new(NoopExtractor::default()),
236 request_middleware: Arc::new(NoopRequestMiddleware::default()),
237 cors_domains: None,
238 allowed_hosts: None,
239 keep_alive: true,
240 threads: 1,
241 }
242 }
243
244 pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
247 self.remote = UninitializedRemote::Shared(remote);
248 self
249 }
250
251 pub fn keep_alive(mut self, val: bool) -> Self {
254 self.keep_alive = val;
255 self
256 }
257
258 #[cfg(not(unix))]
261 pub fn threads(mut self, _threads: usize) -> Self {
262 warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
263 self
264 }
265
266 #[cfg(unix)]
269 pub fn threads(mut self, threads: usize) -> Self {
270 self.threads = threads;
271 self
272 }
273
274 pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
276 self.cors_domains = cors_domains.into();
277 self
278 }
279
280 pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
282 self.request_middleware = Arc::new(middleware);
283 self
284 }
285
286 pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
288 self.meta_extractor = Arc::new(extractor);
289 self
290 }
291
292 pub fn allow_only_bind_host(mut self) -> Self {
294 self.allowed_hosts = Some(Vec::new());
295 self
296 }
297
298 pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
300 self.allowed_hosts = allowed_hosts.into();
301 self
302 }
303
304 pub fn start_http(self, addr: &SocketAddr) -> ServerResult {
306 let cors_domains = self.cors_domains;
307 let request_middleware = self.request_middleware;
308 let allowed_hosts = self.allowed_hosts;
309 let jsonrpc_handler = Rpc {
310 handler: self.handler,
311 extractor: self.meta_extractor,
312 };
313 let keep_alive = self.keep_alive;
314 let reuse_port = self.threads > 1;
315
316 let (local_addr_tx, local_addr_rx) = mpsc::channel();
317 let (close, shutdown_signal) = oneshot::channel();
318 let eloop = self.remote.init_with_name("http.worker0")?;
319 serve(
320 (shutdown_signal, local_addr_tx),
321 eloop.remote(),
322 addr.to_owned(),
323 cors_domains.clone(),
324 request_middleware.clone(),
325 allowed_hosts.clone(),
326 jsonrpc_handler.clone(),
327 keep_alive,
328 reuse_port,
329 );
330 let handles = (0..self.threads - 1).map(|i| {
331 let (local_addr_tx, local_addr_rx) = mpsc::channel();
332 let (close, shutdown_signal) = oneshot::channel();
333 let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
334 serve(
335 (shutdown_signal, local_addr_tx),
336 eloop.remote(),
337 addr.to_owned(),
338 cors_domains.clone(),
339 request_middleware.clone(),
340 allowed_hosts.clone(),
341 jsonrpc_handler.clone(),
342 keep_alive,
343 reuse_port,
344 );
345 Ok((eloop, close, local_addr_rx))
346 }).collect::<io::Result<Vec<_>>>()?;
347
348 let local_addr = recv_address(local_addr_rx);
350 let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
352 let _ = recv_address(local_addr_rx)?;
353 Ok((eloop, close))
354 }).collect::<io::Result<(Vec<_>)>>()?;
355 handles.push((eloop, close));
356 let (remotes, close) = handles.into_iter().unzip();
357
358 Ok(Server {
359 address: local_addr?,
360 remote: Some(remotes),
361 close: Some(close),
362 })
363 }
364}
365
366fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
367 local_addr_rx.recv().map_err(|_| {
368 io::Error::new(io::ErrorKind::Interrupted, "")
369 })?
370}
371
372fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
373 signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
374 remote: tokio_core::reactor::Remote,
375 addr: SocketAddr,
376 cors_domains: CorsDomains,
377 request_middleware: Arc<RequestMiddleware>,
378 allowed_hosts: AllowedHosts,
379 jsonrpc_handler: Rpc<M, S>,
380 keep_alive: bool,
381 reuse_port: bool,
382) {
383 let (shutdown_signal, local_addr_tx) = signals;
384 remote.spawn(move |handle| {
385 let handle1 = handle.clone();
386 let bind = move || {
387 let listener = match addr {
388 SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
389 SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
390 };
391 configure_port(reuse_port, &listener)?;
392 listener.reuse_address(true)?;
393 listener.bind(&addr)?;
394 let listener = listener.listen(1024)?;
395 let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?;
396 let local_addr = listener.local_addr()?;
400
401 Ok((listener, local_addr))
402 };
403
404 let bind_result = match bind() {
405 Ok((listener, local_addr)) => {
406 local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
408
409 futures::future::ok((listener, local_addr))
410 },
411 Err(err) => {
412 local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
414
415 futures::future::err(())
416 }
417 };
418
419 let handle = handle.clone();
420 bind_result.and_then(move |(listener, local_addr)| {
421 let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
422
423 let http = {
424 let mut http = server::Http::new();
425 http.keep_alive(keep_alive);
426 http
427 };
428 listener.incoming()
429 .for_each(move |(socket, addr)| {
430 http.bind_connection(&handle, socket, addr, ServerHandler::new(
431 jsonrpc_handler.clone(),
432 cors_domains.clone(),
433 allowed_hosts.clone(),
434 request_middleware.clone(),
435 ));
436 Ok(())
437 })
438 .map_err(|e| {
439 warn!("Incoming streams error, closing sever: {:?}", e);
440 })
441 .select(shutdown_signal.map_err(|e| {
442 warn!("Shutdown signaller dropped, closing server: {:?}", e);
443 }))
444 .map(|_| ())
445 .map_err(|_| ())
446 })
447 });
448}
449
450#[cfg(unix)]
451fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
452 use net2::unix::*;
453
454 if reuse {
455 try!(tcp.reuse_port(true));
456 }
457
458 Ok(())
459}
460
461#[cfg(not(unix))]
462fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
463 Ok(())
464}
465
466pub struct Server {
468 address: SocketAddr,
469 remote: Option<Vec<Remote>>,
470 close: Option<Vec<oneshot::Sender<()>>>,
471}
472
473const PROOF: &'static str = "Server is always Some until self is consumed.";
474impl Server {
475 pub fn address(&self) -> &SocketAddr {
477 &self.address
478 }
479
480 pub fn close(mut self) {
482 for close in self.close.take().expect(PROOF) {
483 let _ = close.send(());
484 }
485
486 for remote in self.remote.take().expect(PROOF) {
487 remote.close();
488 }
489 }
490
491 pub fn wait(mut self) {
493 for remote in self.remote.take().expect(PROOF) {
494 remote.wait();
495 }
496 }
497}
498
499impl Drop for Server {
500 fn drop(&mut self) {
501 self.remote.take().map(|remotes| {
502 for remote in remotes { remote.close(); }
503 });
504 }
505}
506