#![warn(missing_docs)]
extern crate unicase;
extern crate rs_jsonrpc_core as jsonrpc;
extern crate rs_jsonrpc_server_utils as server_utils;
extern crate net2;
pub extern crate hyper;
#[macro_use]
extern crate log;
mod handler;
mod response;
mod utils;
#[cfg(test)]
mod tests;
use std::io;
use std::sync::{mpsc, Arc};
use std::net::SocketAddr;
use hyper::server;
use jsonrpc::MetaIoHandler;
use jsonrpc::futures::{self, Future, Stream};
use jsonrpc::futures::sync::oneshot;
use server_utils::reactor::{Remote, UninitializedRemote};
pub use server_utils::hosts::{Host, DomainsValidation};
pub use server_utils::cors::{AccessControlAllowOrigin, Origin};
pub use server_utils::tokio_core;
pub use handler::ServerHandler;
pub use utils::{is_host_allowed, cors_header, CorsHeader};
pub use response::Response;
pub enum RequestMiddlewareAction {
Proceed {
should_continue_on_invalid_cors: bool,
request: server::Request,
},
Respond {
should_validate_hosts: bool,
response: Box<Future<Item=server::Response, Error=hyper::Error> + Send>,
}
}
impl From<Response> for RequestMiddlewareAction {
fn from(o: Response) -> Self {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(futures::future::ok(o.into())),
}
}
}
impl From<server::Response> for RequestMiddlewareAction {
fn from(response: server::Response) -> Self {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(futures::future::ok(response)),
}
}
}
impl From<server::Request> for RequestMiddlewareAction {
fn from(request: server::Request) -> Self {
RequestMiddlewareAction::Proceed {
should_continue_on_invalid_cors: false,
request,
}
}
}
pub trait RequestMiddleware: Send + Sync + 'static {
fn on_request(&self, request: server::Request) -> RequestMiddlewareAction;
}
impl<F> RequestMiddleware for F where
F: Fn(server::Request) -> RequestMiddlewareAction + Sync + Send + 'static,
{
fn on_request(&self, request: server::Request) -> RequestMiddlewareAction {
(*self)(request)
}
}
#[derive(Default)]
struct NoopRequestMiddleware;
impl RequestMiddleware for NoopRequestMiddleware {
fn on_request(&self, request: server::Request) -> RequestMiddlewareAction {
RequestMiddlewareAction::Proceed {
should_continue_on_invalid_cors: false,
request,
}
}
}
pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
fn read_metadata(&self, _: &server::Request) -> M {
Default::default()
}
}
impl<M, F> MetaExtractor<M> for F where
M: jsonrpc::Metadata,
F: Fn(&server::Request) -> M + Sync + Send + 'static,
{
fn read_metadata(&self, req: &server::Request) -> M {
(*self)(req)
}
}
#[derive(Default)]
struct NoopExtractor;
impl<M: jsonrpc::Metadata> MetaExtractor<M> for NoopExtractor {}
pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
pub handler: Arc<MetaIoHandler<M, S>>,
pub extractor: Arc<MetaExtractor<M>>,
}
impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
fn clone(&self) -> Self {
Rpc {
handler: self.handler.clone(),
extractor: self.extractor.clone(),
}
}
}
type AllowedHosts = Option<Vec<Host>>;
type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum RestApi {
Secure,
Unsecure,
Disabled,
}
pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
handler: Arc<MetaIoHandler<M, S>>,
remote: UninitializedRemote,
meta_extractor: Arc<MetaExtractor<M>>,
request_middleware: Arc<RequestMiddleware>,
cors_domains: CorsDomains,
allowed_hosts: AllowedHosts,
rest_api: RestApi,
keep_alive: bool,
threads: usize,
}
const SENDER_PROOF: &'static str = "Server initialization awaits local address.";
impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
pub fn new<T>(handler: T) -> Self where
T: Into<MetaIoHandler<M, S>>
{
ServerBuilder {
handler: Arc::new(handler.into()),
remote: UninitializedRemote::Unspawned,
meta_extractor: Arc::new(NoopExtractor::default()),
request_middleware: Arc::new(NoopRequestMiddleware::default()),
cors_domains: None,
allowed_hosts: None,
rest_api: RestApi::Disabled,
keep_alive: true,
threads: 1,
}
}
pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
self.remote = UninitializedRemote::Shared(remote);
self
}
pub fn rest_api(mut self, rest_api: RestApi) -> Self {
self.rest_api = rest_api;
self
}
pub fn keep_alive(mut self, val: bool) -> Self {
self.keep_alive = val;
self
}
#[cfg(not(unix))]
pub fn threads(mut self, _threads: usize) -> Self {
warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
self
}
#[cfg(unix)]
pub fn threads(mut self, threads: usize) -> Self {
self.threads = threads;
self
}
pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
self.cors_domains = cors_domains.into();
self
}
pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
self.request_middleware = Arc::new(middleware);
self
}
pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
self.meta_extractor = Arc::new(extractor);
self
}
pub fn allow_only_bind_host(mut self) -> Self {
self.allowed_hosts = Some(Vec::new());
self
}
pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
self.allowed_hosts = allowed_hosts.into();
self
}
pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
let cors_domains = self.cors_domains;
let request_middleware = self.request_middleware;
let allowed_hosts = self.allowed_hosts;
let rs_jsonrpc_handler = Rpc {
handler: self.handler,
extractor: self.meta_extractor,
};
let rest_api = self.rest_api;
let keep_alive = self.keep_alive;
let reuse_port = self.threads > 1;
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let eloop = self.remote.init_with_name("http.worker0")?;
serve(
(shutdown_signal, local_addr_tx),
eloop.remote(),
addr.to_owned(),
cors_domains.clone(),
request_middleware.clone(),
allowed_hosts.clone(),
rs_jsonrpc_handler.clone(),
rest_api,
keep_alive,
reuse_port,
);
let handles = (0..self.threads - 1).map(|i| {
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
serve(
(shutdown_signal, local_addr_tx),
eloop.remote(),
addr.to_owned(),
cors_domains.clone(),
request_middleware.clone(),
allowed_hosts.clone(),
rs_jsonrpc_handler.clone(),
rest_api,
keep_alive,
reuse_port,
);
Ok((eloop, close, local_addr_rx))
}).collect::<io::Result<Vec<_>>>()?;
let local_addr = recv_address(local_addr_rx);
let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
let _ = recv_address(local_addr_rx)?;
Ok((eloop, close))
}).collect::<io::Result<(Vec<_>)>>()?;
handles.push((eloop, close));
let (remotes, close) = handles.into_iter().unzip();
Ok(Server {
address: local_addr?,
remote: Some(remotes),
close: Some(close),
})
}
}
fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
local_addr_rx.recv().map_err(|_| {
io::Error::new(io::ErrorKind::Interrupted, "")
})?
}
fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
remote: tokio_core::reactor::Remote,
addr: SocketAddr,
cors_domains: CorsDomains,
request_middleware: Arc<RequestMiddleware>,
allowed_hosts: AllowedHosts,
rs_jsonrpc_handler: Rpc<M, S>,
rest_api: RestApi,
keep_alive: bool,
reuse_port: bool,
) {
let (shutdown_signal, local_addr_tx) = signals;
remote.spawn(move |handle| {
let handle1 = handle.clone();
let bind = move || {
let listener = match addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
};
configure_port(reuse_port, &listener)?;
listener.reuse_address(true)?;
listener.bind(&addr)?;
let listener = listener.listen(1024)?;
let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?;
let local_addr = listener.local_addr()?;
Ok((listener, local_addr))
};
let bind_result = match bind() {
Ok((listener, local_addr)) => {
local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
futures::future::ok((listener, local_addr))
},
Err(err) => {
local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
futures::future::err(())
}
};
let handle = handle.clone();
bind_result.and_then(move |(listener, local_addr)| {
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
let http = {
let mut http = server::Http::new();
http.keep_alive(keep_alive);
http
};
listener.incoming()
.for_each(move |(socket, addr)| {
http.bind_connection(&handle, socket, addr, ServerHandler::new(
rs_jsonrpc_handler.clone(),
cors_domains.clone(),
allowed_hosts.clone(),
request_middleware.clone(),
rest_api,
));
Ok(())
})
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map(|_| ())
.map_err(|_| ())
})
});
}
#[cfg(unix)]
fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
use net2::unix::*;
if reuse {
try!(tcp.reuse_port(true));
}
Ok(())
}
#[cfg(not(unix))]
fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
Ok(())
}
pub struct Server {
address: SocketAddr,
remote: Option<Vec<Remote>>,
close: Option<Vec<oneshot::Sender<()>>>,
}
const PROOF: &'static str = "Server is always Some until self is consumed.";
impl Server {
pub fn address(&self) -> &SocketAddr {
&self.address
}
pub fn close(mut self) {
for close in self.close.take().expect(PROOF) {
let _ = close.send(());
}
for remote in self.remote.take().expect(PROOF) {
remote.close();
}
}
pub fn wait(mut self) {
for remote in self.remote.take().expect(PROOF) {
remote.wait();
}
}
}
impl Drop for Server {
fn drop(&mut self) {
self.remote.take().map(|remotes| {
for remote in remotes { remote.close(); }
});
}
}