use crate::router::{Handler, HandlerObj, ResponseFuture, Router, RouterError};
use crate::web::response;
use failure::{Backtrace, Context, Fail, ResultExt};
use futures::channel::oneshot;
use futures::TryStreamExt;
use hyper::server::accept;
use hyper::service::make_service_fn;
use hyper::{Body, Request, Server, StatusCode};
use rustls::internal::pemfile;
use std::convert::Infallible;
use std::fmt::{self, Display};
use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{io, thread};
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::stream::StreamExt;
use tokio_rustls::TlsAcceptor;
#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
}
#[derive(Clone, Eq, PartialEq, Debug, Fail, Serialize, Deserialize)]
pub enum ErrorKind {
#[fail(display = "Internal error: {}", _0)]
Internal(String),
#[fail(display = "Bad arguments: {}", _0)]
Argument(String),
#[fail(display = "Not found.")]
NotFound,
#[fail(display = "Request error: {}", _0)]
RequestError(String),
#[fail(display = "ResponseError error: {}", _0)]
ResponseError(String),
#[fail(display = "Router error: {}", _0)]
Router(RouterError),
}
impl Fail for Error {
fn cause(&self) -> Option<&dyn Fail> {
self.inner.cause()
}
fn backtrace(&self) -> Option<&Backtrace> {
self.inner.backtrace()
}
}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Display::fmt(&self.inner, f)
}
}
impl Error {
pub fn kind(&self) -> &ErrorKind {
self.inner.get_context()
}
}
impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Error {
Error {
inner: Context::new(kind),
}
}
}
impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner: inner }
}
}
impl From<RouterError> for Error {
fn from(error: RouterError) -> Error {
Error {
inner: Context::new(ErrorKind::Router(error)),
}
}
}
impl From<crate::chain::Error> for Error {
fn from(error: crate::chain::Error) -> Error {
Error {
inner: Context::new(ErrorKind::Internal(error.to_string())),
}
}
}
#[derive(Clone)]
pub struct TLSConfig {
pub certificate: String,
pub private_key: String,
}
impl TLSConfig {
pub fn new(certificate: String, private_key: String) -> TLSConfig {
TLSConfig {
certificate,
private_key,
}
}
fn load_certs(&self) -> Result<Vec<rustls::Certificate>, Error> {
let certfile = File::open(&self.certificate).context(ErrorKind::Internal(format!(
"failed to open file {}",
self.certificate
)))?;
let mut reader = io::BufReader::new(certfile);
pemfile::certs(&mut reader)
.map_err(|_| ErrorKind::Internal("failed to load certificate".to_string()).into())
}
fn load_private_key(&self) -> Result<rustls::PrivateKey, Error> {
let keyfile = File::open(&self.private_key).context(ErrorKind::Internal(format!(
"failed to open file {}",
self.private_key
)))?;
let mut reader = io::BufReader::new(keyfile);
let keys = pemfile::pkcs8_private_keys(&mut reader)
.map_err(|_| ErrorKind::Internal("failed to load private key".to_string()))?;
if keys.len() != 1 {
return Err(ErrorKind::Internal("expected a single private key".to_string()).into());
}
Ok(keys[0].clone())
}
pub fn build_server_config(&self) -> Result<Arc<rustls::ServerConfig>, Error> {
let certs = self.load_certs()?;
let key = self.load_private_key()?;
let mut cfg = rustls::ServerConfig::new(rustls::NoClientAuth::new());
cfg.set_single_cert(certs, key)
.context(ErrorKind::Internal(
"set single certificate failed".to_string(),
))?;
Ok(Arc::new(cfg))
}
}
pub struct ApiServer {
shutdown_sender: Option<oneshot::Sender<()>>,
}
impl ApiServer {
pub fn new() -> ApiServer {
ApiServer {
shutdown_sender: None,
}
}
pub fn start(
&mut self,
addr: SocketAddr,
router: Router,
conf: Option<TLSConfig>,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<thread::JoinHandle<()>, Error> {
match conf {
Some(conf) => self.start_tls(addr, router, conf, api_chan),
None => self.start_no_tls(addr, router, api_chan),
}
}
fn start_no_tls(
&mut self,
addr: SocketAddr,
router: Router,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<thread::JoinHandle<()>, Error> {
if self.shutdown_sender.is_some() {
return Err(ErrorKind::Internal(
"Can't start HTTP API server, it's running already".to_string(),
)
.into());
}
let rx = &mut api_chan.1;
let tx = &mut api_chan.0;
let m = oneshot::channel::<()>();
let tx = std::mem::replace(tx, m.0);
self.shutdown_sender = Some(tx);
thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let server = async move {
let server = Server::bind(&addr)
.serve(make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}))
.with_graceful_shutdown(async {
rx.await.ok();
});
server.await
};
let mut rt = Runtime::new()
.map_err(|e| eprintln!("HTTP API server error: {}", e))
.unwrap();
if let Err(e) = rt.block_on(server) {
eprintln!("HTTP API server error: {}", e)
}
})
.map_err(|_| ErrorKind::Internal("failed to spawn API thread".to_string()).into())
}
fn start_tls(
&mut self,
addr: SocketAddr,
router: Router,
conf: TLSConfig,
api_chan: &'static mut (oneshot::Sender<()>, oneshot::Receiver<()>),
) -> Result<thread::JoinHandle<()>, Error> {
if self.shutdown_sender.is_some() {
return Err(ErrorKind::Internal(
"Can't start HTTPS API server, it's running already".to_string(),
)
.into());
}
let rx = &mut api_chan.1;
let tx = &mut api_chan.0;
let m = oneshot::channel::<()>();
let tx = std::mem::replace(tx, m.0);
self.shutdown_sender = Some(tx);
let acceptor = TlsAcceptor::from(conf.build_server_config()?);
thread::Builder::new()
.name("apis".to_string())
.spawn(move || {
let server = async move {
let mut listener = TcpListener::bind(&addr).await.expect("failed to bind");
let listener = listener
.incoming()
.and_then(move |s| acceptor.accept(s))
.filter(|r| r.is_ok());
let server = Server::builder(accept::from_stream(listener))
.serve(make_service_fn(move |_| {
let router = router.clone();
async move { Ok::<_, Infallible>(router) }
}))
.with_graceful_shutdown(async {
rx.await.ok();
});
server.await
};
let mut rt = Runtime::new()
.map_err(|e| eprintln!("HTTP API server error: {}", e))
.unwrap();
if let Err(e) = rt.block_on(server) {
eprintln!("HTTP API server error: {}", e)
}
})
.map_err(|_| ErrorKind::Internal("failed to spawn API thread".to_string()).into())
}
pub fn stop(&mut self) -> bool {
if self.shutdown_sender.is_some() {
let tx = self.shutdown_sender.as_mut().unwrap();
let m = oneshot::channel::<()>();
let tx = std::mem::replace(tx, m.0);
tx.send(()).expect("Failed to stop API server");
info!("API server has been stopped");
true
} else {
error!("Can't stop API server, it's not running or doesn't spport stop operation");
false
}
}
}
pub struct LoggingMiddleware {}
impl Handler for LoggingMiddleware {
fn call(
&self,
req: Request<Body>,
mut handlers: Box<dyn Iterator<Item = HandlerObj>>,
) -> ResponseFuture {
debug!("REST call: {} {}", req.method(), req.uri().path());
match handlers.next() {
Some(handler) => handler.call(req, handlers),
None => response(StatusCode::INTERNAL_SERVER_ERROR, "no handler found"),
}
}
}