use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::Write as _;
use crate::actix_web::{error::BlockingError, web, Error, HttpRequest, HttpResponse};
use crate::error::InvalidStateError;
use crate::futures::{future::IntoFuture, stream::Stream, Future};
use crate::registry::rest_api::error::RegistryRestApiError;
#[cfg(feature = "authorization")]
use crate::registry::rest_api::{REGISTRY_READ_PERMISSION, REGISTRY_WRITE_PERMISSION};
use crate::registry::{
rest_api::resources::nodes::{ListNodesResponse, NewNode, NodeResponse},
MetadataPredicate, Node, RegistryReader, RegistryWriter, RwRegistry,
};
use crate::rest_api::{
actix_web_1::{Method, ProtocolVersionRangeGuard, Resource},
paging::{get_response_paging_info, DEFAULT_LIMIT, DEFAULT_OFFSET},
percent_encode_filter_query, ErrorResponse, SPLINTER_PROTOCOL_VERSION,
};
const REGISTRY_LIST_NODES_MIN: u32 = 1;
type Filter = HashMap<String, (String, String)>;
pub fn make_nodes_resource(registry: Box<dyn RwRegistry>) -> Resource {
let registry1 = registry.clone();
let resource = Resource::build("/registry/nodes").add_request_guard(
ProtocolVersionRangeGuard::new(REGISTRY_LIST_NODES_MIN, SPLINTER_PROTOCOL_VERSION),
);
#[cfg(feature = "authorization")]
{
resource
.add_method(Method::Get, REGISTRY_READ_PERMISSION, move |r, _| {
list_nodes(r, web::Data::new(registry.clone_box_as_reader()))
})
.add_method(Method::Post, REGISTRY_WRITE_PERMISSION, move |_, p| {
add_node(p, web::Data::new(registry1.clone()))
})
}
#[cfg(not(feature = "authorization"))]
{
resource
.add_method(Method::Get, move |r, _| {
list_nodes(r, web::Data::new(registry.clone_box_as_reader()))
})
.add_method(Method::Post, move |_, p| {
add_node(p, web::Data::new(registry1.clone()))
})
}
}
fn list_nodes(
req: HttpRequest,
registry: web::Data<Box<dyn RegistryReader>>,
) -> Box<dyn Future<Item = HttpResponse, Error = Error>> {
let query: web::Query<HashMap<String, String>> =
if let Ok(q) = web::Query::from_query(req.query_string()) {
q
} else {
return Box::new(
HttpResponse::BadRequest()
.json(ErrorResponse::bad_request("Invalid query"))
.into_future(),
);
};
let offset = match query.get("offset") {
Some(value) => match value.parse::<usize>() {
Ok(val) => val,
Err(err) => {
return Box::new(
HttpResponse::BadRequest()
.json(ErrorResponse::bad_request(&format!(
"Invalid offset value passed: {}. Error: {}",
value, err
)))
.into_future(),
)
}
},
None => DEFAULT_OFFSET,
};
let limit = match query.get("limit") {
Some(value) => match value.parse::<usize>() {
Ok(val) => val,
Err(err) => {
return Box::new(
HttpResponse::BadRequest()
.json(ErrorResponse::bad_request(&format!(
"Invalid limit value passed: {}. Error: {}",
value, err
)))
.into_future(),
)
}
},
None => DEFAULT_LIMIT,
};
let mut link = format!("{}?", req.uri().path());
let filters = match query.get("filter") {
Some(value) => match serde_json::from_str(value) {
Ok(val) => {
if let Err(e) = write!(link, "filter={}&", percent_encode_filter_query(value)) {
return Box::new(
HttpResponse::InternalServerError()
.body(e.to_string())
.into_future(),
);
}
Some(val)
}
Err(err) => {
return Box::new(
HttpResponse::BadRequest()
.json(ErrorResponse::bad_request(&format!(
"Invalid filter value passed: {}. Error: {}",
value, err
)))
.into_future(),
)
}
},
None => None,
};
let predicates = match to_predicates(filters) {
Ok(predicates) => predicates,
Err(err) => {
return Box::new(
HttpResponse::BadRequest()
.json(ErrorResponse::bad_request(&format!(
"Invalid predicate: {}",
err
)))
.into_future(),
)
}
};
Box::new(query_list_nodes(
registry,
link,
predicates,
Some(offset),
Some(limit),
))
}
fn query_list_nodes(
registry: web::Data<Box<dyn RegistryReader>>,
link: String,
filters: Vec<MetadataPredicate>,
offset: Option<usize>,
limit: Option<usize>,
) -> impl Future<Item = HttpResponse, Error = Error> {
web::block(move || {
let nodes = registry
.list_nodes(&filters)
.map_err(RegistryRestApiError::from)?;
let offset_value = offset.unwrap_or(0);
let total = nodes.len();
let limit_value = limit.unwrap_or(total as usize);
let nodes = nodes
.skip(offset_value)
.take(limit_value)
.collect::<Vec<_>>();
Ok((nodes, link, limit, offset, total as usize))
})
.then(
|res: Result<_, BlockingError<RegistryRestApiError>>| match res {
Ok((nodes, link, limit, offset, total_count)) => {
Ok(HttpResponse::Ok().json(ListNodesResponse {
data: nodes.iter().map(NodeResponse::from).collect(),
paging: get_response_paging_info(limit, offset, &link, total_count),
}))
}
Err(err) => {
error!("Unable to list nodes: {}", err);
Ok(HttpResponse::InternalServerError().json(ErrorResponse::internal_error()))
}
},
)
}
fn to_predicates(filters: Option<Filter>) -> Result<Vec<MetadataPredicate>, String> {
match filters {
Some(filters) => filters
.into_iter()
.map(|(key, (operator, value))| match operator.as_str() {
"=" => Ok(MetadataPredicate::Eq(key, value)),
">" => Ok(MetadataPredicate::Gt(key, value)),
"<" => Ok(MetadataPredicate::Lt(key, value)),
">=" => Ok(MetadataPredicate::Ge(key, value)),
"<=" => Ok(MetadataPredicate::Le(key, value)),
"!=" => Ok(MetadataPredicate::Ne(key, value)),
_ => Err(format!("{} is not a valid operator", operator)),
})
.collect(),
None => Ok(vec![]),
}
}
fn add_node(
payload: web::Payload,
registry: web::Data<Box<dyn RwRegistry>>,
) -> Box<dyn Future<Item = HttpResponse, Error = Error>> {
Box::new(
payload
.from_err::<Error>()
.fold(web::BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, Error>(body)
})
.into_future()
.and_then(move |body| match serde_json::from_slice::<NewNode>(&body) {
Ok(node) => Box::new(
web::block(move || {
let new_node = Node::try_from(node).map_err(|err| {
RegistryRestApiError::InvalidStateError(
InvalidStateError::with_message(format!(
"Failed to add node, node is invalid: {}",
err
)),
)
})?;
registry
.add_node(new_node)
.map_err(RegistryRestApiError::from)
})
.then(|res| {
Ok(match res {
Ok(_) => HttpResponse::Ok().finish(),
Err(BlockingError::Error(RegistryRestApiError::InvalidStateError(
err,
))) => HttpResponse::BadRequest().json(ErrorResponse::bad_request(
&format!("Invalid node: {}", err),
)),
Err(err) => {
error!("Unable to add node: {}", err);
HttpResponse::InternalServerError()
.json(ErrorResponse::internal_error())
}
})
}),
)
as Box<dyn Future<Item = HttpResponse, Error = Error>>,
Err(err) => Box::new(
HttpResponse::BadRequest()
.json(ErrorResponse::bad_request(&format!(
"Invalid node: {}",
err
)))
.into_future(),
),
}),
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use reqwest::{blocking::Client, StatusCode, Url};
use serde_json::{to_value, Value as JsonValue};
use crate::error::InvalidStateError;
use crate::registry::{error::RegistryError, NodeIter};
use crate::rest_api::{
actix_web_1::{RestApiBuilder, RestApiShutdownHandle},
paging::Paging,
};
#[test]
fn test_list_nodes_ok() {
let (shutdown_handle, join_handle, bind_url) = run_rest_api_on_open_port(vec![
make_nodes_resource(Box::new(MemRegistry::new(vec![get_node_1(), get_node_2()]))),
]);
let url = Url::parse(&format!("http://{}/registry/nodes", bind_url))
.expect("Failed to parse URL");
let resp = Client::new()
.get(url)
.header("SplinterProtocolVersion", SPLINTER_PROTOCOL_VERSION)
.send()
.expect("Failed to perform request");
assert_eq!(resp.status(), StatusCode::OK);
let body: JsonValue = resp.json().expect("Failed to deserialize body");
let nodes = body
.get("data")
.expect("No data field in response")
.as_array()
.expect("data field is not an array")
.to_vec();
assert_eq!(2, nodes.len());
assert!(nodes.contains(
&to_value(NodeResponse::from(&get_node_1()))
.expect("Failed to convert node1 to JsonValue")
));
assert!(nodes.contains(
&to_value(NodeResponse::from(&get_node_2()))
.expect("Failed to convert node2 to JsonValue")
));
assert_eq!(
body.get("paging").expect("no paging field in response"),
&to_value(create_test_paging_response(
0,
100,
0,
0,
0,
2,
"/registry/nodes?"
))
.expect("failed to convert expected paging")
);
shutdown_handle
.shutdown()
.expect("Unable to shutdown rest api");
join_handle.join().expect("Unable to join rest api thread");
}
#[test]
fn test_list_nodes_with_filters_ok() {
let (shutdown_handle, join_handle, bind_url) = run_rest_api_on_open_port(vec![
make_nodes_resource(Box::new(MemRegistry::new(vec![get_node_1(), get_node_2()]))),
]);
let filter = percent_encode_filter_query("{\"company\":[\"=\",\"Bitwise IO\"]}");
let url = Url::parse(&format!(
"http://{}/registry/nodes?filter={}",
bind_url, filter
))
.expect("Failed to parse URL");
let resp = Client::new()
.get(url)
.header("SplinterProtocolVersion", SPLINTER_PROTOCOL_VERSION)
.send()
.expect("Failed to perform request");
assert_eq!(resp.status(), StatusCode::OK);
let nodes: JsonValue = resp.json().expect("Failed to deserialize body");
assert_eq!(
nodes.get("data").expect("no data field in response"),
&to_value(vec![NodeResponse::from(&get_node_1())])
.expect("failed to convert expected data"),
);
assert_eq!(
nodes.get("paging").expect("no paging field in response"),
&to_value(create_test_paging_response(
0,
100,
0,
0,
0,
1,
&format!("/registry/nodes?filter={}&", filter)
))
.expect("failed to convert expected paging")
);
shutdown_handle
.shutdown()
.expect("Unable to shutdown rest api");
join_handle.join().expect("Unable to join rest api thread");
}
#[test]
fn test_list_node_with_filters_bad_request() {
let (shutdown_handle, join_handle, bind_url) = run_rest_api_on_open_port(vec![
make_nodes_resource(Box::new(MemRegistry::new(vec![get_node_1(), get_node_2()]))),
]);
let filter = percent_encode_filter_query("{\"company\":[\"*\",\"Bitwise IO\"]}");
let url = Url::parse(&format!(
"http://{}/registry/nodes?filter={}",
bind_url, filter
))
.expect("Failed to parse URL");
let resp = Client::new()
.get(url)
.header("SplinterProtocolVersion", SPLINTER_PROTOCOL_VERSION)
.send()
.expect("Failed to perform request");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
shutdown_handle
.shutdown()
.expect("Unable to shutdown rest api");
join_handle.join().expect("Unable to join rest api thread");
}
#[test]
fn test_add_node() {
let (shutdown_handle, join_handle, bind_url) =
run_rest_api_on_open_port(vec![make_nodes_resource(Box::new(MemRegistry::default()))]);
let url = Url::parse(&format!("http://{}/registry/nodes", bind_url))
.expect("Failed to parse URL");
let resp = Client::new()
.post(url)
.header("SplinterProtocolVersion", SPLINTER_PROTOCOL_VERSION)
.send()
.expect("Failed to perform request");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let url = Url::parse(&format!("http://{}/registry/nodes", bind_url))
.expect("Failed to parse URL");
let resp = Client::new()
.post(url)
.header("SplinterProtocolVersion", SPLINTER_PROTOCOL_VERSION)
.json(&get_new_node_1())
.send()
.expect("Failed to perform request");
assert_eq!(resp.status(), StatusCode::OK);
shutdown_handle
.shutdown()
.expect("Unable to shutdown rest api");
join_handle.join().expect("Unable to join rest api thread");
}
fn run_rest_api_on_open_port(
resources: Vec<Resource>,
) -> (RestApiShutdownHandle, std::thread::JoinHandle<()>, String) {
#[cfg(not(feature = "https-bind"))]
let bind = "127.0.0.1:0";
#[cfg(feature = "https-bind")]
let bind = crate::rest_api::BindConfig::Http("127.0.0.1:0".into());
let result = RestApiBuilder::new()
.with_bind(bind)
.add_resources(resources.clone())
.build_insecure()
.expect("Failed to build REST API")
.run_insecure();
match result {
Ok((shutdown_handle, join_handle)) => {
let port = shutdown_handle.port_numbers()[0];
(shutdown_handle, join_handle, format!("127.0.0.1:{}", port))
}
Err(err) => panic!("Failed to run REST API: {}", err),
}
}
fn create_test_paging_response(
offset: usize,
limit: usize,
next_offset: usize,
previous_offset: usize,
last_offset: usize,
total: usize,
link: &str,
) -> Paging {
let base_link = format!("{}limit={}&", link, limit);
let current_link = format!("{}offset={}", base_link, offset);
let first_link = format!("{}offset=0", base_link);
let next_link = format!("{}offset={}", base_link, next_offset);
let previous_link = format!("{}offset={}", base_link, previous_offset);
let last_link = format!("{}offset={}", base_link, last_offset);
Paging {
current: current_link,
offset,
limit,
total,
first: first_link,
prev: previous_link,
next: next_link,
last: last_link,
}
}
fn get_node_1() -> Node {
Node::builder("Node-123")
.with_endpoint("12.0.0.123:8431")
.with_display_name("Bitwise IO - Node 1")
.with_key("0123")
.with_metadata("company", "Bitwise IO")
.build()
.expect("Failed to build node1")
}
fn get_new_node_1() -> NewNode {
let mut metadata = HashMap::new();
metadata.insert("company".into(), "Bitwise IO".into());
NewNode {
identity: "Node-123".into(),
endpoints: vec!["12.0.0.123:8431".into()],
display_name: "Bitwise IO - Node 1".into(),
keys: vec!["0123".into()],
metadata,
}
}
fn get_node_2() -> Node {
Node::builder("Node-456")
.with_endpoint("13.0.0.123:8434")
.with_display_name("Cargill - Node 1")
.with_key("abcd")
.with_metadata("company", "Cargill")
.build()
.expect("Failed to build node2")
}
#[derive(Clone, Default)]
struct MemRegistry {
nodes: Arc<Mutex<HashMap<String, Node>>>,
}
impl MemRegistry {
fn new(nodes: Vec<Node>) -> Self {
let mut nodes_map = HashMap::new();
for node in nodes {
nodes_map.insert(node.identity.clone(), node);
}
Self {
nodes: Arc::new(Mutex::new(nodes_map)),
}
}
}
impl RegistryReader for MemRegistry {
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<NodeIter<'a>, RegistryError> {
let mut nodes = self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.clone();
nodes.retain(|_, node| predicates.iter().all(|predicate| predicate.apply(node)));
Ok(Box::new(nodes.into_iter().map(|(_, node)| node)))
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, RegistryError> {
self.list_nodes(predicates).map(|iter| iter.count() as u32)
}
fn get_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.get(identity)
.cloned())
}
}
impl RegistryWriter for MemRegistry {
fn add_node(&self, node: Node) -> Result<(), RegistryError> {
self.nodes
.lock()
.expect("mem registry lock was poisoned")
.insert(node.identity.clone(), node);
Ok(())
}
fn update_node(&self, node: Node) -> Result<(), RegistryError> {
let mut inner = self.nodes.lock().expect("mem registry lock was poisoned");
if inner.contains_key(&node.identity) {
inner.insert(node.identity.clone(), node);
Ok(())
} else {
Err(RegistryError::InvalidStateError(
InvalidStateError::with_message(format!(
"Node does not exist in the registry: {}",
node.identity
)),
))
}
}
fn delete_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.remove(identity))
}
}
impl RwRegistry for MemRegistry {
fn clone_box(&self) -> Box<dyn RwRegistry> {
Box::new(self.clone())
}
fn clone_box_as_reader(&self) -> Box<dyn RegistryReader> {
Box::new(self.clone())
}
fn clone_box_as_writer(&self) -> Box<dyn RegistryWriter> {
Box::new(self.clone())
}
}
}