use std::future::Future;
use std::io::Error as IoError;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::{self, JoinHandle};
use actix_0_10::System as ActixSystem;
use actix_web_3::{dev::Server, middleware, App, HttpServer};
use futures_0_3::executor::block_on;
use openssl::ssl::SslAcceptorBuilder;
use crate::error::InternalError;
use crate::rest_api::RestApiServerError;
use crate::threading::lifecycle::ShutdownHandle;
use super::ResourceProvider;
#[derive(Debug)]
pub struct BindAddress {
pub addr: SocketAddr,
pub scheme: String,
}
enum FromThreadMessage {
IoError(IoError, String),
Running(Server, Vec<BindAddress>),
}
pub struct RestApi {
bind_addresses: Vec<BindAddress>,
join_handle: Option<JoinHandle<()>>,
server: Server,
shutdown_future: Option<Pin<Box<dyn Future<Output = ()>>>>,
}
impl RestApi {
pub(super) fn new(
bind_url: String,
bind_acceptor_builder: Option<SslAcceptorBuilder>,
resource_providers: Vec<Box<dyn ResourceProvider>>,
) -> Result<Self, RestApiServerError> {
let providers: Arc<Mutex<Vec<_>>> = Arc::new(Mutex::new(resource_providers));
let (sender, receiver) = mpsc::channel();
let join_handle = thread::Builder::new()
.name("SplinterRestApi".into())
.spawn(move || {
let sys = ActixSystem::new("ActixSystem-Splinter-REST-API");
let mut http_server = HttpServer::new(move || {
let app = App::new();
let mut app = app.wrap(middleware::Logger::default());
for provider in providers.lock().unwrap().iter() {
for resource in provider.resources() {
app = app.service(resource)
}
}
app
});
http_server = match if let Some(acceptor_builder) = bind_acceptor_builder {
http_server.bind_openssl(&bind_url, acceptor_builder)
} else {
http_server.bind(&bind_url)
} {
Ok(http_server) => http_server,
Err(err1) => {
let error_msg = format!("Bind to \"{}\" failed", bind_url);
if let Err(err2) =
sender.send(FromThreadMessage::IoError(err1, error_msg.clone()))
{
error!("{}", error_msg);
error!("Failed to notify receiver of bind error: {}", err2);
}
return;
}
};
let bind_addresses = http_server
.addrs_with_scheme()
.iter()
.map(|(addr, scheme)| BindAddress {
addr: *addr,
scheme: scheme.to_string(),
})
.collect();
let server = http_server.disable_signals().system_exit().run();
if let Err(err) = sender.send(FromThreadMessage::Running(server, bind_addresses)) {
error!("Unable to send running message to parent thread: {}", err);
return;
}
match sys.run() {
Ok(()) => info!("Rest API terminating"),
Err(err) => error!("REST API unexpectedly exiting: {}", err),
};
})?;
let (server, bind_addresses) = loop {
match receiver.recv() {
Ok(FromThreadMessage::Running(server, bind_address)) => {
break (server, bind_address);
}
Ok(FromThreadMessage::IoError(err, error_msg)) => Err(
RestApiServerError::StartUpError(format!("{}: {}", error_msg, err)),
),
Err(err) => Err(RestApiServerError::StartUpError(format!(
"Error receiving message from Rest Api thread: {}",
err
))),
}?;
};
Ok(RestApi {
bind_addresses,
join_handle: Some(join_handle),
server,
shutdown_future: None,
})
}
pub fn bind_addresses(&self) -> &Vec<BindAddress> {
&self.bind_addresses
}
}
impl ShutdownHandle for RestApi {
fn signal_shutdown(&mut self) {
self.shutdown_future = Some(Box::pin(self.server.stop(true)));
}
fn wait_for_shutdown(mut self) -> Result<(), InternalError> {
match (self.shutdown_future.take(), self.join_handle.take()) {
(Some(f), Some(join_handle)) => {
block_on(f);
join_handle.join().map_err(|_| {
InternalError::with_message(
"RestApi thread panicked, join() failed".to_string(),
)
})?;
Ok(())
}
(_, _) => Err(InternalError::with_message(
"Called wait_for_shutdown() prior to signal_shutdown()".to_string(),
)),
}
}
}