#[cfg(feature = "rest-api-cors")]
pub mod cors;
mod errors;
mod events;
pub mod paging;
mod response_models;
#[cfg(feature = "json-web-tokens")]
pub mod secrets;
#[cfg(feature = "json-web-tokens")]
pub mod sessions;
use actix_web::{
error::ErrorBadRequest, http::header, middleware, web, App, Error as ActixError, HttpRequest,
HttpResponse, HttpServer,
};
use futures::{future::FutureResult, stream::Stream, Future, IntoFuture};
use percent_encoding::{AsciiSet, CONTROLS};
use protobuf::{self, Message};
use std::boxed::Box;
use std::sync::{mpsc, Arc};
use std::thread;
pub use errors::{RequestError, ResponseError, RestApiServerError};
pub use events::{new_websocket_event_sender, EventSender};
pub use response_models::ErrorResponse;
const QUERY_ENCODE_SET: &AsciiSet = &CONTROLS
.add(b' ')
.add(b'"')
.add(b'<')
.add(b'>')
.add(b'`')
.add(b'=')
.add(b'!')
.add(b'{')
.add(b'}')
.add(b'[')
.add(b']')
.add(b':')
.add(b',');
pub trait RestResourceProvider {
fn resources(&self) -> Vec<Resource>;
}
pub type HandlerFunction = Box<
dyn Fn(HttpRequest, web::Payload) -> Box<dyn Future<Item = HttpResponse, Error = ActixError>>
+ Send
+ Sync
+ 'static,
>;
pub struct RestApiShutdownHandle {
do_shutdown: Box<dyn Fn() -> Result<(), RestApiServerError> + Send>,
}
impl RestApiShutdownHandle {
pub fn shutdown(&self) -> Result<(), RestApiServerError> {
(*self.do_shutdown)()
}
}
pub struct Request(HttpRequest, web::Payload);
impl From<(HttpRequest, web::Payload)> for Request {
fn from((http_request, payload): (HttpRequest, web::Payload)) -> Self {
Self(http_request, payload)
}
}
impl Into<(HttpRequest, web::Payload)> for Request {
fn into(self) -> (HttpRequest, web::Payload) {
(self.0, self.1)
}
}
pub struct Response(HttpResponse);
impl From<HttpResponse> for Response {
fn from(res: HttpResponse) -> Self {
Self(res)
}
}
impl IntoFuture for Response {
type Item = HttpResponse;
type Error = ActixError;
type Future = FutureResult<HttpResponse, ActixError>;
fn into_future(self) -> Self::Future {
self.0.into_future()
}
}
impl std::fmt::Debug for Response {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
#[derive(Clone)]
pub enum Method {
Get,
Post,
Put,
Patch,
Delete,
Head,
}
impl std::fmt::Display for Method {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Method::Get => f.write_str("GET"),
Method::Post => f.write_str("POST"),
Method::Put => f.write_str("PUT"),
Method::Patch => f.write_str("PATCH"),
Method::Delete => f.write_str("DELETE"),
Method::Head => f.write_str("HEAD"),
}
}
}
#[derive(Clone)]
pub struct Resource {
route: String,
request_guards: Vec<Arc<dyn RequestGuard>>,
methods: Vec<(Method, Arc<HandlerFunction>)>,
}
impl Resource {
#[deprecated(note = "Please use the `build` and `add_method` methods instead")]
pub fn new<F>(method: Method, route: &str, handle: F) -> Self
where
F: Fn(
HttpRequest,
web::Payload,
) -> Box<dyn Future<Item = HttpResponse, Error = ActixError>>
+ Send
+ Sync
+ 'static,
{
Self::build(route).add_method(method, handle)
}
pub fn build(route: &str) -> Self {
Self {
route: route.to_string(),
methods: vec![],
request_guards: vec![],
}
}
pub fn add_method<F>(mut self, method: Method, handle: F) -> Self
where
F: Fn(
HttpRequest,
web::Payload,
) -> Box<dyn Future<Item = HttpResponse, Error = ActixError>>
+ Send
+ Sync
+ 'static,
{
self.methods.push((method, Arc::new(Box::new(handle))));
self
}
pub fn add_request_guard<RG>(mut self, guard: RG) -> Self
where
RG: RequestGuard + Clone + 'static,
{
self.request_guards.push(Arc::new(guard));
self
}
fn into_route(self) -> actix_web::Resource {
let mut resource = web::resource(&self.route);
let mut allowed_methods = self
.methods
.iter()
.map(|(method, _)| method.to_string())
.collect::<Vec<_>>()
.join(", ");
allowed_methods += ", OPTIONS";
resource = resource.route(web::route().guard(actix_web::guard::Options()).to(
move |_: HttpRequest| {
HttpResponse::Ok()
.header(header::ALLOW, allowed_methods.clone())
.finish()
},
));
let request_guards = self.request_guards;
self.methods
.into_iter()
.fold(resource, |resource, (method, handler)| {
let guards = request_guards.clone();
let func = move |r: HttpRequest, p: web::Payload| {
if !guards.is_empty() {
for guard in guards.clone() {
match guard.evaluate(&r) {
Continuation::Terminate(result) => return result,
Continuation::Continue => (),
}
}
}
(handler)(r, p)
};
resource.route(match method {
Method::Get => web::get().to_async(func),
Method::Post => web::post().to_async(func),
Method::Put => web::put().to_async(func),
Method::Patch => web::patch().to_async(func),
Method::Delete => web::delete().to_async(func),
Method::Head => web::head().to_async(func),
})
})
}
}
pub enum Continuation {
Continue,
Terminate(Box<dyn Future<Item = HttpResponse, Error = ActixError>>),
}
impl Continuation {
pub fn terminate<F>(fut: F) -> Continuation
where
F: Future<Item = HttpResponse, Error = ActixError> + 'static,
{
Continuation::Terminate(Box::new(fut))
}
}
pub trait RequestGuard: Send + Sync {
fn evaluate(&self, req: &HttpRequest) -> Continuation;
}
impl<F> RequestGuard for F
where
F: Fn(&HttpRequest) -> Continuation + Sync + Send,
{
fn evaluate(&self, req: &HttpRequest) -> Continuation {
(*self)(req)
}
}
impl RequestGuard for Box<dyn RequestGuard> {
fn evaluate(&self, req: &HttpRequest) -> Continuation {
(**self).evaluate(req)
}
}
#[derive(Clone)]
pub struct ProtocolVersionRangeGuard {
min: u32,
max: u32,
}
impl ProtocolVersionRangeGuard {
pub fn new(min: u32, max: u32) -> Self {
Self { min, max }
}
}
impl RequestGuard for ProtocolVersionRangeGuard {
fn evaluate(&self, req: &HttpRequest) -> Continuation {
if let Some(header_value) = req.headers().get("SplinterProtocolVersion") {
let parsed_header = header_value
.to_str()
.map_err(|err| {
format!(
"Invalid characters in SplinterProtocolVersion header: {}",
err
)
})
.and_then(|val_str| {
val_str.parse::<u32>().map_err(|_| {
"SplinterProtocolVersion must be a valid positive integer".to_string()
})
});
match parsed_header {
Err(msg) => Continuation::terminate(
HttpResponse::BadRequest()
.json(json!({
"message": msg,
}))
.into_future(),
),
Ok(version) if version < self.min => Continuation::terminate(
HttpResponse::BadRequest()
.json(json!({
"message": format!(
"Client must support protocol version {} or greater.",
self.min,
),
"requested_protocol": version,
"splinter_protocol": self.max,
"libsplinter_version": format!(
"{}.{}.{}",
env!("CARGO_PKG_VERSION_MAJOR"),
env!("CARGO_PKG_VERSION_MINOR"),
env!("CARGO_PKG_VERSION_PATCH")
)
}))
.into_future(),
),
Ok(version) if version > self.max => Continuation::terminate(
HttpResponse::BadRequest()
.json(json!({
"message": format!(
"Client requires a newer protocol than can be provided: {} > {}",
version,
self.max,
),
"requested_protocol": version,
"splinter_protocol": self.max,
"libsplinter_version": format!(
"{}.{}.{}",
env!("CARGO_PKG_VERSION_MAJOR"),
env!("CARGO_PKG_VERSION_MINOR"),
env!("CARGO_PKG_VERSION_PATCH")
)
}))
.into_future(),
),
Ok(_) => Continuation::Continue,
}
} else {
Continuation::Continue
}
}
}
#[derive(Clone)]
pub struct RestApi {
resources: Vec<Resource>,
bind: String,
}
impl RestApi {
pub fn run(
self,
) -> Result<(RestApiShutdownHandle, thread::JoinHandle<()>), RestApiServerError> {
let (tx, rx) = mpsc::channel();
let bind_url = self.bind.to_owned();
let resources = self.resources.to_owned();
let join_handle = thread::Builder::new()
.name("SplinterDRestApi".into())
.spawn(move || {
let sys = actix::System::new("SplinterD-Rest-API");
let mut server = HttpServer::new(move || {
#[cfg(feature = "rest-api-cors")]
let mut app = App::new()
.wrap(middleware::Logger::default())
.wrap(cors::Cors::new_allow_any());
#[cfg(not(feature = "rest-api-cors"))]
let mut app = App::new().wrap(middleware::Logger::default());
for resource in resources.clone() {
app = app.service(resource.into_route());
}
app
});
server = match server.bind(&bind_url) {
Ok(server) => server,
Err(err) => {
let error_msg = format!("Invalid REST API bind {}: {}", bind_url, err);
error!("{}", error_msg);
if let Err(err) = tx.send(Err(error_msg)) {
error!("Failed to notify receiver of bind error: {}", err);
}
return;
}
};
let addr = server.disable_signals().system_exit().start();
if let Err(err) = tx.send(Ok(addr)) {
error!("Unable to send Server Addr: {}", err);
}
if let Err(err) = sys.run() {
error!("REST Api unexpectedly exiting: {}", err);
};
info!("Rest API terminating");
})?;
let addr = rx
.recv()
.map_err(|err| {
RestApiServerError::StartUpError(format!("Unable to receive Server Addr: {}", err))
})?
.map_err(|err| {
RestApiServerError::BindError(format!(
"Failed to bind to URL {}: {}",
self.bind, err
))
})?;
let do_shutdown = Box::new(move || {
debug!("Shutting down Rest API");
if let Err(err) = addr.stop(true).wait() {
error!("An error occured while shutting down rest API: {:?}", err);
}
debug!("Graceful signal sent to Rest API");
Ok(())
});
Ok((RestApiShutdownHandle { do_shutdown }, join_handle))
}
}
pub struct RestApiBuilder {
resources: Vec<Resource>,
bind: Option<String>,
}
impl Default for RestApiBuilder {
fn default() -> Self {
Self {
resources: Vec::new(),
bind: None,
}
}
}
impl RestApiBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_bind(mut self, value: &str) -> Self {
self.bind = Some(value.to_string());
self
}
pub fn add_resource(mut self, value: Resource) -> Self {
self.resources.push(value);
self
}
pub fn add_resources(mut self, mut values: Vec<Resource>) -> Self {
self.resources.append(&mut values);
self
}
pub fn build(self) -> Result<RestApi, RestApiServerError> {
let bind = self
.bind
.ok_or_else(|| RestApiServerError::MissingField("bind".to_string()))?;
Ok(RestApi {
bind,
resources: self.resources,
})
}
}
pub fn into_protobuf<M: Message>(
payload: web::Payload,
) -> impl Future<Item = M, Error = ActixError> {
payload
.from_err::<ActixError>()
.fold(web::BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, ActixError>(body)
})
.and_then(|body| match protobuf::parse_from_bytes::<M>(&body) {
Ok(proto) => Ok(proto),
Err(err) => Err(ErrorBadRequest(json!({ "message": format!("{}", err) }))),
})
.into_future()
}
pub fn into_bytes(payload: web::Payload) -> impl Future<Item = Vec<u8>, Error = ActixError> {
payload
.from_err::<ActixError>()
.fold(web::BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, ActixError>(body)
})
.and_then(|body| Ok(body.to_vec()))
.into_future()
}
pub fn percent_encode_filter_query(input: &str) -> String {
percent_encoding::utf8_percent_encode(input, QUERY_ENCODE_SET).to_string()
}
pub fn require_header(header_key: &str, request: &HttpRequest) -> Result<String, RequestError> {
let header = request.headers().get(header_key).ok_or_else(|| {
RequestError::MissingHeader(format!("Header {} not included in Request", header_key))
})?;
Ok(header
.to_str()
.map_err(|err| RequestError::InvalidHeaderValue(format!("Invalid header value: {}", err)))?
.to_string())
}
pub fn get_authorization_token(request: &HttpRequest) -> Result<String, RequestError> {
let auth_header = require_header("Authorization", &request)?;
Ok(auth_header
.split_whitespace()
.last()
.ok_or_else(|| {
RequestError::InvalidHeaderValue(
"Authorization token not included in request".to_string(),
)
})?
.to_string())
}
#[cfg(test)]
mod test {
use super::*;
use actix_http::Response;
use futures::IntoFuture;
#[test]
fn test_resource() {
Resource::build("/test")
.add_method(Method::Get, |_: HttpRequest, _: web::Payload| {
Box::new(Response::Ok().finish().into_future())
})
.into_route();
}
#[test]
fn test_resource_with_guard() {
Resource::build("/test-guarded")
.add_request_guard(|_: &HttpRequest| {
Continuation::terminate(Response::BadRequest().finish().into_future())
})
.add_method(Method::Get, |_: HttpRequest, _: web::Payload| {
Box::new(Response::Ok().finish().into_future())
})
.into_route();
}
}