use std::convert::TryFrom;
use std::path::Path;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use std::time::{Duration, Instant};
use openssl::hash::{hash, MessageDigest};
use crate::error::{InternalError, InvalidStateError};
use crate::hex::to_hex;
use crate::registry::{
error::InvalidNodeError, validate_nodes, MetadataPredicate, Node, NodeIter, RegistryError,
RegistryReader,
};
use crate::threading::lifecycle::ShutdownHandle;
use super::{LocalYamlRegistry, YamlNode};
pub struct RemoteYamlRegistry {
internal: Arc<Mutex<Internal>>,
shutdown_handle: Option<RemoteYamlShutdownHandle>,
}
impl RemoteYamlRegistry {
pub fn new(
url: &str,
cache_dir: &str,
automatic_refresh_period: Option<Duration>,
forced_refresh_period: Option<Duration>,
) -> Result<Self, RegistryError> {
let internal = Arc::new(Mutex::new(Internal::new(
url,
cache_dir,
forced_refresh_period,
)?));
let (running, join_handle) = {
if let Some(refresh_period) = automatic_refresh_period {
let running = Arc::new(AtomicBool::new(true));
let thread_internal = internal.clone();
let thread_url = url.to_string();
let thread_running = running.clone();
let join_handle = thread::Builder::new()
.name(format!("Remote Registry Automatic Refresh: {}", url))
.spawn(move || {
automatic_refresh_loop(
refresh_period,
thread_internal,
&thread_url,
thread_running,
)
})
.map_err(|err| {
RegistryError::InternalError(InternalError::from_source_with_message(
Box::new(err),
format!(
"Failed to spawn automatic refresh thread for remote registry '{}'",
url
),
))
})?;
(Some(running), Some(join_handle))
} else {
(None, None)
}
};
let shutdown_handle = RemoteYamlShutdownHandle {
running,
join_handle,
};
Ok(Self {
internal,
shutdown_handle: Some(shutdown_handle),
})
}
pub fn take_shutdown_handle(&mut self) -> Option<RemoteYamlShutdownHandle> {
self.shutdown_handle.take()
}
fn get_nodes(&self) -> Result<Vec<Node>, RegistryError> {
self.internal
.lock()
.map_err(|_| {
RegistryError::InternalError(InternalError::with_message(
"Internal lock poisoned".into(),
))
})?
.get_nodes()
}
}
impl RegistryReader for RemoteYamlRegistry {
fn get_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.get_nodes()?
.into_iter()
.find(|node| node.identity == identity))
}
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<NodeIter<'a>, RegistryError> {
let mut nodes = self.get_nodes()?;
nodes.retain(|node| predicates.iter().all(|predicate| predicate.apply(node)));
Ok(Box::new(nodes.into_iter()))
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, RegistryError> {
Ok(self
.get_nodes()?
.iter()
.filter(move |node| predicates.iter().all(|predicate| predicate.apply(node)))
.count() as u32)
}
}
struct Internal {
url: String,
cache: LocalYamlRegistry,
last_refresh_successful: bool,
forced_refresh_period: Option<Duration>,
next_forced_refresh: Option<Instant>,
}
impl Internal {
fn new(
url: &str,
cache_dir: &str,
forced_refresh_period: Option<Duration>,
) -> Result<Self, RegistryError> {
let url = url.to_string();
let cache = LocalYamlRegistry::new(&compute_cache_filename(&url, cache_dir)?)?;
let mut internal = Self {
url,
cache,
last_refresh_successful: false,
forced_refresh_period,
next_forced_refresh: None,
};
if let Err(err) = internal.refresh_cache() {
warn!(
"Couldn't initialize cache on startup of remote registry '{}': {}",
internal.url, err
);
}
Ok(internal)
}
fn refresh_cache(&mut self) -> Result<(), RegistryError> {
fetch_nodes_from_remote(&self.url)
.and_then(|nodes| self.cache.write_nodes(nodes))
.map_err(|err| {
self.last_refresh_successful = false;
err
})
.and_then(|_| {
self.last_refresh_successful = true;
self.next_forced_refresh = self
.forced_refresh_period
.map(|duration| {
Instant::now().checked_add(duration).ok_or_else(|| {
RegistryError::InternalError(InternalError::with_message(
"Forced refresh time could not be determined; \
forced_refresh_period may be too large"
.into(),
))
})
})
.transpose()?;
Ok(())
})
}
fn get_nodes(&mut self) -> Result<Vec<Node>, RegistryError> {
if !self.last_refresh_successful {
match self.refresh_cache() {
Ok(_) => debug!("Successfully refreshed remote registry '{}'", self.url),
Err(err) => debug!("Failed to refresh remote registry '{}': {}", self.url, err),
}
}
else if self
.next_forced_refresh
.map(|instant| instant < Instant::now())
.unwrap_or(false)
{
match self.refresh_cache() {
Ok(_) => debug!(
"Forced refresh of remote registry '{}' successful",
self.url
),
Err(err) => warn!(
"Forced refresh of remote registry '{}' failed: {}",
self.url, err
),
}
}
self.cache.get_nodes()
}
}
fn compute_cache_filename(url: &str, cache_dir: &str) -> Result<String, RegistryError> {
let hash = hash(MessageDigest::sha256(), url.as_bytes())
.map(|digest| to_hex(&*digest))
.map_err(|err| {
RegistryError::InternalError(InternalError::from_source_with_message(
Box::new(err),
"Failed to hash URL for cache file".into(),
))
})?;
let filename = format!("remote_registry_{}.yaml", hash);
Ok(Path::new(cache_dir)
.join(filename)
.to_str()
.expect("path built from &str cannot be invalid")
.to_string())
}
fn fetch_nodes_from_remote(url: &str) -> Result<Vec<Node>, RegistryError> {
let bytes = reqwest::blocking::get(url)
.and_then(|response| response.error_for_status())
.map_err(|err| {
RegistryError::InternalError(InternalError::from_source_with_message(
Box::new(err),
format!("Failed to fetch remote registry file from {}", url),
))
})?
.bytes()
.map_err(|err| {
RegistryError::InternalError(InternalError::from_source_with_message(
Box::new(err),
"Failed to get bytes from remote registry file HTTP response".into(),
))
})?;
let yaml_nodes: Vec<YamlNode> = serde_yaml::from_slice(&bytes).map_err(|_| {
RegistryError::InternalError(InternalError::with_message(
"Failed to deserialize remote registry file: Not a valid YAML sequence of nodes".into(),
))
})?;
let nodes: Vec<Node> = yaml_nodes
.into_iter()
.map(Node::try_from)
.collect::<Result<Vec<Node>, InvalidNodeError>>()
.map_err(|err| {
RegistryError::InvalidStateError(InvalidStateError::with_message(format!(
"Unable to get node list: {}",
err
)))
})?;
validate_nodes(&nodes).map_err(|err| {
RegistryError::InvalidStateError(InvalidStateError::with_message(err.to_string()))
})?;
Ok(nodes)
}
fn automatic_refresh_loop(
refresh_period: Duration,
internal: Arc<Mutex<Internal>>,
url: &str,
running: Arc<AtomicBool>,
) {
loop {
let refresh_time = Instant::now() + refresh_period;
while Instant::now() < refresh_time {
if !running.load(Ordering::SeqCst) {
return;
}
if let Some(time_left) = refresh_time.checked_duration_since(Instant::now()) {
thread::sleep(std::cmp::min(time_left, Duration::from_secs(1)));
}
}
let mut internal = match internal.lock() {
Ok(internal) => internal,
Err(_) => {
warn!("Internal lock poisoned for remote registry '{}'", url);
continue;
}
};
let previous_refresh_successful = internal.last_refresh_successful;
match internal.refresh_cache() {
Ok(_) => debug!("Automatic refresh of remote registry '{}' successful", url),
Err(err) => {
let err_msg = format!(
"Automatic refresh of remote registry '{}' failed: {}",
url, err
);
if previous_refresh_successful {
warn!("{}", err_msg)
} else {
debug!("{}", err_msg)
}
}
}
}
}
pub struct RemoteYamlShutdownHandle {
running: Option<Arc<AtomicBool>>,
join_handle: Option<thread::JoinHandle<()>>,
}
impl ShutdownHandle for RemoteYamlShutdownHandle {
fn signal_shutdown(&mut self) {
if let Some(running) = &self.running {
running.store(false, Ordering::SeqCst)
}
}
fn wait_for_shutdown(self) -> Result<(), InternalError> {
if let Some(join_handle) = self.join_handle {
if join_handle.join().is_err() {
return Err(InternalError::with_message(
"Unable to shutdown remote yaml registry".to_string(),
));
}
}
Ok(())
}
}
#[cfg(all(test, feature = "rest-api-actix-web-1"))]
mod tests {
use super::*;
use std::fs::File;
use actix_web::HttpResponse;
use futures::future::IntoFuture;
use tempdir::TempDir;
use crate::rest_api::actix_web_1::{Method, Resource, RestApiBuilder, RestApiShutdownHandle};
#[cfg(feature = "authorization")]
use crate::rest_api::auth::authorization::Permission;
#[test]
fn duplicate_identity() {
let mut registry = mock_registry();
registry[0].identity = "identity".into();
registry[1].identity = "identity".into();
let test_config = TestConfig::setup("duplicate_identity", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn duplicate_endpoint() {
let mut registry = mock_registry();
registry[0].endpoints = vec!["endpoint".into()];
registry[1].endpoints = vec!["endpoint".into()];
let test_config = TestConfig::setup("duplicate_endpoint", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn empty_identity() {
let mut registry = mock_registry();
registry[0].identity = "".into();
let test_config = TestConfig::setup("empty_identity", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn empty_endpoint() {
let mut registry = mock_registry();
registry[0].endpoints = vec!["".into()];
let test_config = TestConfig::setup("empty_endpoint", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn empty_display_name() {
let mut registry = mock_registry();
registry[0].display_name = "".into();
let test_config = TestConfig::setup("empty_display_name", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn empty_key() {
let mut registry = mock_registry();
registry[0].keys = vec!["".into()];
let test_config = TestConfig::setup("empty_key", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn missing_endpoints() {
let mut registry = mock_registry();
registry[0].endpoints = vec![];
let test_config = TestConfig::setup("missing_endpoints", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn missing_keys() {
let mut registry = mock_registry();
registry[0].keys = vec![];
let test_config = TestConfig::setup("missing_keys", Some(registry));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn fetch_node_ok() {
let test_config = TestConfig::setup("fetch_node_ok", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let expected_node = mock_registry().pop().expect("Failed to get expected node");
let node = remote_registry
.get_node(&expected_node.identity)
.expect("Failed to fetch node")
.expect("Node not found");
assert_eq!(node, expected_node);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn fetch_node_not_found() {
let test_config = TestConfig::setup("fetch_node_not_found", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
assert!(remote_registry
.get_node("NodeNotInRegistry")
.expect("Failed to fetch node")
.is_none());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn has_node() {
let test_config = TestConfig::setup("has_node", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let expected_node = mock_registry().pop().expect("Failed to get expected node");
assert!(remote_registry
.has_node(&expected_node.identity)
.expect("Failed to check if expected_node exists"));
assert!(!remote_registry
.has_node("NodeNotInRegistry")
.expect("Failed to check for non-existent node"));
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn list_nodes() {
let test_config = TestConfig::setup("list_nodes", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let nodes = remote_registry
.list_nodes(&[])
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert_eq!(nodes.len(), mock_registry().len());
for node in mock_registry() {
assert!(nodes.contains(&node));
}
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn list_nodes_empty() {
let test_config = TestConfig::setup("list_nodes_empty", Some(vec![]));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let nodes = remote_registry
.list_nodes(&[])
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert!(nodes.is_empty());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn list_nodes_filter_metadata() {
let test_config = TestConfig::setup("list_nodes_filter_metadata", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let filter = vec![MetadataPredicate::Eq(
"company".into(),
mock_registry()[0]
.metadata
.get("company")
.expect("company metadata not set")
.into(),
)];
let nodes = remote_registry
.list_nodes(&filter)
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0], mock_registry()[0]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn list_nodes_filter_multiple() {
let test_config = TestConfig::setup("list_nodes_filter_multiple", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let filter = vec![
MetadataPredicate::Eq(
"company".to_string(),
mock_registry()[2]
.metadata
.get("company")
.unwrap()
.to_string(),
),
MetadataPredicate::Eq(
"admin".to_string(),
mock_registry()[2]
.metadata
.get("admin")
.unwrap()
.to_string(),
),
];
let nodes = remote_registry
.list_nodes(&filter)
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0], mock_registry()[2]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn list_nodes_filter_empty() {
let test_config = TestConfig::setup("list_nodes_filter_empty", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let filter = vec![MetadataPredicate::Eq(
"admin".to_string(),
"not an admin".to_string(),
)];
let nodes = remote_registry
.list_nodes(&filter)
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert!(nodes.is_empty());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn file_available_at_startup() {
let test_config = TestConfig::setup("file_available_at_startup", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn file_unavailable_at_startup() {
let test_config = TestConfig::setup("file_unavailable_at_startup", None);
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
test_config.update_registry(Some(mock_registry()));
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn auto_refresh_disabled() {
let test_config = TestConfig::setup("auto_refresh_disabled", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
assert!(shutdown_handle.running.is_none());
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn auto_refresh_enabled() {
let test_config = TestConfig::setup("auto_refresh_enabled", Some(mock_registry()));
let refresh_period = Duration::from_secs(1);
let mut remote_registry = RemoteYamlRegistry::new(
test_config.url(),
test_config.path(),
Some(refresh_period),
None,
)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
assert!(shutdown_handle.running.is_some());
test_config.update_registry(Some(vec![]));
std::thread::sleep(refresh_period * 2);
verify_internal_cache(&test_config, &remote_registry, vec![]);
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn forced_refresh_disabled() {
let test_config = TestConfig::setup("forced_refresh_disabled", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
test_config.update_registry(Some(vec![]));
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn forced_refresh_enabled() {
let test_config = TestConfig::setup("forced_refresh_enabled", Some(mock_registry()));
let refresh_period = Duration::from_millis(10);
let mut remote_registry = RemoteYamlRegistry::new(
test_config.url(),
test_config.path(),
None,
Some(refresh_period),
)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
test_config.update_registry(Some(vec![]));
std::thread::sleep(refresh_period);
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn restart_file_available() {
let test_config = TestConfig::setup("restart_file_available", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.update_registry(Some(vec![]));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
#[test]
fn restart_file_unavailable() {
let test_config = TestConfig::setup("restart_file_unavailable", Some(mock_registry()));
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.update_registry(None);
let mut remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
let mut shutdown_handle = remote_registry
.take_shutdown_handle()
.expect("Unable to get shutdown handle");
shutdown_handle.signal_shutdown();
shutdown_handle
.wait_for_shutdown()
.expect("Unable to shutdown remote registry");
test_config.shutdown();
}
fn mock_registry() -> Vec<Node> {
vec![
Node::builder("Node-123")
.with_endpoint("tcps://12.0.0.123:8431")
.with_display_name("Bitwise IO - Node 1")
.with_key("abcd")
.with_metadata("company", "Bitwise IO")
.with_metadata("admin", "Bob")
.build()
.expect("Failed to build node1"),
Node::builder("Node-456")
.with_endpoint("tcps://12.0.0.123:8434")
.with_display_name("Cargill - Node 1")
.with_key("0123")
.with_metadata("company", "Cargill")
.with_metadata("admin", "Carol")
.build()
.expect("Failed to build node2"),
Node::builder("Node-789")
.with_endpoint("tcps://12.0.0.123:8435")
.with_display_name("Cargill - Node 2")
.with_key("4567")
.with_metadata("company", "Cargill")
.with_metadata("admin", "Charlie")
.build()
.expect("Failed to build node3"),
]
}
fn verify_internal_cache(
test_config: &TestConfig,
remote_registry: &RemoteYamlRegistry,
expected_registry: Vec<Node>,
) {
assert_eq!(
remote_registry.get_nodes().expect("Failed to get nodes"),
expected_registry,
);
let filename = compute_cache_filename(test_config.url(), test_config.path())
.expect("Failed to compute cache filename");
let file = File::open(filename).expect("Failed to open cache file");
let file_contents: Vec<YamlNode> =
serde_yaml::from_reader(file).expect("Failed to deserialize cache file");
let file_contents_nodes: Vec<Node> = file_contents
.into_iter()
.map(|node| Node::try_from(node).expect("Unable to build node"))
.collect();
assert_eq!(file_contents_nodes, expected_registry);
}
struct TestConfig {
_temp_dir: TempDir,
temp_dir_path: String,
registry: Arc<Mutex<Option<Vec<Node>>>>,
registry_url: String,
rest_api_shutdown_handle: RestApiShutdownHandle,
rest_api_join_handle: std::thread::JoinHandle<()>,
}
impl TestConfig {
fn setup(test_name: &str, registry: Option<Vec<Node>>) -> Self {
let temp_dir = TempDir::new(test_name).expect("Failed to create temp dir");
let temp_dir_path = temp_dir
.path()
.to_str()
.expect("Failed to get path")
.to_string();
let registry = Arc::new(Mutex::new(registry));
let (rest_api_shutdown_handle, rest_api_join_handle, registry_url) =
serve_registry(registry.clone());
Self {
_temp_dir: temp_dir,
temp_dir_path,
registry,
registry_url,
rest_api_shutdown_handle,
rest_api_join_handle,
}
}
fn path(&self) -> &str {
&self.temp_dir_path
}
fn url(&self) -> &str {
&self.registry_url
}
fn update_registry(&self, registry: Option<Vec<Node>>) {
*self.registry.lock().expect("Registry lock poisonsed") = registry;
}
fn shutdown(self) {
self.rest_api_shutdown_handle
.shutdown()
.expect("Unable to shutdown rest api");
self.rest_api_join_handle
.join()
.expect("Unable to join rest api thread");
}
}
fn serve_registry(
registry: Arc<Mutex<Option<Vec<Node>>>>,
) -> (RestApiShutdownHandle, std::thread::JoinHandle<()>, String) {
let mut resource = Resource::build("/registry.yaml");
#[cfg(feature = "authorization")]
{
resource = resource.add_method(
Method::Get,
Permission::AllowUnauthenticated,
move |_, _| {
Box::new(match &*registry.lock().expect("Registry lock poisoned") {
Some(registry) => {
let yaml_registry: Vec<YamlNode> = registry
.iter()
.map(|node| YamlNode::from(node.clone()))
.collect();
HttpResponse::Ok()
.body(
serde_yaml::to_vec(&yaml_registry)
.expect("Failed to serialize registry file"),
)
.into_future()
}
None => HttpResponse::NotFound().finish().into_future(),
})
},
)
}
#[cfg(not(feature = "authorization"))]
{
resource = resource.add_method(Method::Get, move |_, _| {
Box::new(match &*registry.lock().expect("Registry lock poisoned") {
Some(registry) => {
let yaml_registry: Vec<YamlNode> = registry
.iter()
.map(|node| YamlNode::from(node.clone()))
.collect();
HttpResponse::Ok()
.body(
serde_yaml::to_vec(&yaml_registry)
.expect("Failed to serialize registry file"),
)
.into_future()
}
None => HttpResponse::NotFound().finish().into_future(),
})
})
}
let (shutdown, join, url) = run_rest_api_on_open_port(vec![resource]);
(shutdown, join, format!("http://{}/registry.yaml", url))
}
fn run_rest_api_on_open_port(
resources: Vec<Resource>,
) -> (RestApiShutdownHandle, std::thread::JoinHandle<()>, String) {
#[cfg(not(feature = "https-bind"))]
let bind = "127.0.0.1:0";
#[cfg(feature = "https-bind")]
let bind = crate::rest_api::BindConfig::Http("127.0.0.1:0".into());
let result = RestApiBuilder::new()
.with_bind(bind)
.add_resources(resources.clone())
.build_insecure()
.expect("Failed to build REST API")
.run_insecure();
match result {
Ok((shutdown_handle, join_handle)) => {
let port = shutdown_handle.port_numbers()[0];
(shutdown_handle, join_handle, format!("127.0.0.1:{}", port))
}
Err(err) => panic!("Failed to run REST API: {}", err),
}
}
}