use std::path::Path;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use std::time::{Duration, Instant};
use openssl::hash::{hash, MessageDigest};
use crate::hex::to_hex;
use crate::registry::{
validate_nodes, MetadataPredicate, Node, NodeIter, RegistryError, RegistryReader,
};
use super::LocalYamlRegistry;
pub struct RemoteYamlRegistry {
internal: Arc<Mutex<Internal>>,
shutdown_handle: ShutdownHandle,
}
impl RemoteYamlRegistry {
pub fn new(
url: &str,
cache_dir: &str,
automatic_refresh_period: Option<Duration>,
forced_refresh_period: Option<Duration>,
) -> Result<Self, RegistryError> {
let internal = Arc::new(Mutex::new(Internal::new(
url,
cache_dir,
forced_refresh_period,
)?));
let running = automatic_refresh_period
.map::<Result<_, RegistryError>, _>(|refresh_period| {
let running = Arc::new(AtomicBool::new(true));
let thread_internal = internal.clone();
let thread_url = url.to_string();
let thread_running = running.clone();
thread::Builder::new()
.name(format!("Remote Registry Automatic Refresh: {}", url))
.spawn(move || {
automatic_refresh_loop(
refresh_period,
thread_internal,
&thread_url,
thread_running,
)
})
.map_err(|err| {
RegistryError::general_error_with_source(
&format!(
"Failed to spawn automatic refresh thread for remote registry '{}'",
url
),
Box::new(err),
)
})?;
Ok(running)
})
.transpose()?;
let shutdown_handle = ShutdownHandle { running };
Ok(Self {
internal,
shutdown_handle,
})
}
pub fn shutdown_handle(&self) -> ShutdownHandle {
self.shutdown_handle.clone()
}
fn get_nodes(&self) -> Result<Vec<Node>, RegistryError> {
self.internal
.lock()
.map_err(|_| RegistryError::general_error("Internal lock poisoned"))?
.get_nodes()
}
}
impl RegistryReader for RemoteYamlRegistry {
fn fetch_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.get_nodes()?
.into_iter()
.find(|node| node.identity == identity))
}
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<NodeIter<'a>, RegistryError> {
let mut nodes = self.get_nodes()?;
nodes.retain(|node| predicates.iter().all(|predicate| predicate.apply(node)));
Ok(Box::new(nodes.into_iter()))
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, RegistryError> {
Ok(self
.get_nodes()?
.iter()
.filter(move |node| predicates.iter().all(|predicate| predicate.apply(node)))
.count() as u32)
}
}
struct Internal {
url: String,
cache: LocalYamlRegistry,
last_refresh_successful: bool,
forced_refresh_period: Option<Duration>,
next_forced_refresh: Option<Instant>,
}
impl Internal {
fn new(
url: &str,
cache_dir: &str,
forced_refresh_period: Option<Duration>,
) -> Result<Self, RegistryError> {
let url = url.to_string();
let cache = LocalYamlRegistry::new(&compute_cache_filename(&url, cache_dir)?)?;
let mut internal = Self {
url,
cache,
last_refresh_successful: false,
forced_refresh_period,
next_forced_refresh: None,
};
if let Err(err) = internal.refresh_cache() {
warn!(
"Couldn't initialize cache on startup of remote registry '{}': {}",
internal.url, err
);
}
Ok(internal)
}
fn refresh_cache(&mut self) -> Result<(), RegistryError> {
fetch_nodes_from_remote(&self.url)
.and_then(|nodes| self.cache.write_nodes(nodes))
.map_err(|err| {
self.last_refresh_successful = false;
err
})
.and_then(|_| {
self.last_refresh_successful = true;
self.next_forced_refresh = self
.forced_refresh_period
.map(|duration| {
Instant::now().checked_add(duration).ok_or_else(|| {
RegistryError::general_error(
"Forced refresh time could not be determined; \
forced_refresh_period may be too large",
)
})
})
.transpose()?;
Ok(())
})
}
fn get_nodes(&mut self) -> Result<Vec<Node>, RegistryError> {
if !self.last_refresh_successful {
match self.refresh_cache() {
Ok(_) => debug!("Successfully refreshed remote registy '{}'", self.url),
Err(err) => debug!("Failed to refresh remote registry '{}': {}", self.url, err),
}
}
else if self
.next_forced_refresh
.map(|instant| instant < Instant::now())
.unwrap_or(false)
{
match self.refresh_cache() {
Ok(_) => debug!("Forced refresh of remote registy '{}' successful", self.url),
Err(err) => warn!(
"Forced refresh of remote registry '{}' failed: {}",
self.url, err
),
}
}
self.cache.get_nodes()
}
}
fn compute_cache_filename(url: &str, cache_dir: &str) -> Result<String, RegistryError> {
let hash = hash(MessageDigest::sha256(), url.as_bytes())
.map(|digest| to_hex(&*digest))
.map_err(|err| {
RegistryError::general_error_with_source(
"Failed to hash URL for cache file",
Box::new(err),
)
})?;
let filename = format!("remote_registry_{}.yaml", hash);
Ok(Path::new(cache_dir)
.join(filename)
.to_str()
.expect("path built from &str cannot be invalid")
.to_string())
}
fn fetch_nodes_from_remote(url: &str) -> Result<Vec<Node>, RegistryError> {
let bytes = reqwest::blocking::get(url)
.and_then(|response| response.error_for_status())
.map_err(|err| {
RegistryError::general_error_with_source(
&format!("Failed to fetch remote registry file from {}", url),
Box::new(err),
)
})?
.bytes()
.map_err(|err| {
RegistryError::general_error_with_source(
"Failed to get bytes from remote registry file HTTP response",
Box::new(err),
)
})?;
let nodes: Vec<Node> = serde_yaml::from_slice(&bytes).map_err(|_| {
RegistryError::general_error(
"Failed to deserialize remote registry file: Not a valid YAML sequence of nodes",
)
})?;
validate_nodes(&nodes)?;
Ok(nodes)
}
fn automatic_refresh_loop(
refresh_period: Duration,
internal: Arc<Mutex<Internal>>,
url: &str,
running: Arc<AtomicBool>,
) {
loop {
let refresh_time = Instant::now() + refresh_period;
while Instant::now() < refresh_time {
if !running.load(Ordering::SeqCst) {
return;
}
if let Some(time_left) = refresh_time.checked_duration_since(Instant::now()) {
thread::sleep(std::cmp::min(time_left, Duration::from_secs(1)));
}
}
let mut internal = match internal.lock() {
Ok(internal) => internal,
Err(_) => {
warn!("Internal lock poisoned for remote registry '{}'", url);
continue;
}
};
let previous_refresh_successful = internal.last_refresh_successful;
match internal.refresh_cache() {
Ok(_) => debug!("Automatic refresh of remote registy '{}' successful", url),
Err(err) => {
let err_msg = format!(
"Automatic refresh of remote registry '{}' failed: {}",
url, err
);
if previous_refresh_successful {
warn!("{}", err_msg)
} else {
debug!("{}", err_msg)
}
}
}
}
}
#[derive(Clone)]
pub struct ShutdownHandle {
running: Option<Arc<AtomicBool>>,
}
impl ShutdownHandle {
pub fn shutdown(&self) {
if let Some(running) = &self.running {
running.store(false, Ordering::SeqCst)
}
}
}
#[cfg(all(test, feature = "rest-api", feature = "rest-api-actix"))]
mod tests {
use super::*;
use std::fs::File;
use actix_web::HttpResponse;
use futures::future::IntoFuture;
use tempdir::TempDir;
use crate::rest_api::{
Method, Resource, RestApiBuilder, RestApiServerError, RestApiShutdownHandle,
};
#[test]
fn duplicate_identity() {
let mut registry = mock_registry();
registry[0].identity = "identity".into();
registry[1].identity = "identity".into();
let test_config = TestConfig::setup("duplicate_identity", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn duplicate_endpoint() {
let mut registry = mock_registry();
registry[0].endpoints = vec!["endpoint".into()];
registry[1].endpoints = vec!["endpoint".into()];
let test_config = TestConfig::setup("duplicate_endpoint", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn empty_identity() {
let mut registry = mock_registry();
registry[0].identity = "".into();
let test_config = TestConfig::setup("empty_identity", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn empty_endpoint() {
let mut registry = mock_registry();
registry[0].endpoints = vec!["".into()];
let test_config = TestConfig::setup("empty_endpoint", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn empty_display_name() {
let mut registry = mock_registry();
registry[0].display_name = "".into();
let test_config = TestConfig::setup("empty_display_name", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn empty_key() {
let mut registry = mock_registry();
registry[0].keys = vec!["".into()];
let test_config = TestConfig::setup("empty_key", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn missing_endpoints() {
let mut registry = mock_registry();
registry[0].endpoints = vec![];
let test_config = TestConfig::setup("missing_endpoints", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn missing_keys() {
let mut registry = mock_registry();
registry[0].keys = vec![];
let test_config = TestConfig::setup("missing_keys", Some(registry));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn fetch_node_ok() {
let test_config = TestConfig::setup("fetch_node_ok", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let expected_node = mock_registry().pop().expect("Failed to get expected node");
let node = remote_registry
.fetch_node(&expected_node.identity)
.expect("Failed to fetch node")
.expect("Node not found");
assert_eq!(node, expected_node);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn fetch_node_not_found() {
let test_config = TestConfig::setup("fetch_node_not_found", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
assert!(remote_registry
.fetch_node("NodeNotInRegistry")
.expect("Failed to fetch node")
.is_none());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn list_nodes() {
let test_config = TestConfig::setup("list_nodes", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let nodes = remote_registry
.list_nodes(&[])
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert_eq!(nodes.len(), mock_registry().len());
for node in mock_registry() {
assert!(nodes.contains(&node));
}
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn list_nodes_empty() {
let test_config = TestConfig::setup("list_nodes_empty", Some(vec![]));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let nodes = remote_registry
.list_nodes(&[])
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert!(nodes.is_empty());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn list_nodes_filter_metadata() {
let test_config = TestConfig::setup("list_nodes_filter_metadata", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let filter = vec![MetadataPredicate::Eq(
"company".into(),
mock_registry()[0]
.metadata
.get("company")
.expect("company metadata not set")
.into(),
)];
let nodes = remote_registry
.list_nodes(&filter)
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0], mock_registry()[0]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn list_nodes_filter_multiple() {
let test_config = TestConfig::setup("list_nodes_filter_multiple", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let filter = vec![
MetadataPredicate::Eq(
"company".to_string(),
mock_registry()[2]
.metadata
.get("company")
.unwrap()
.to_string(),
),
MetadataPredicate::Eq(
"admin".to_string(),
mock_registry()[2]
.metadata
.get("admin")
.unwrap()
.to_string(),
),
];
let nodes = remote_registry
.list_nodes(&filter)
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0], mock_registry()[2]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn list_nodes_filter_empty() {
let test_config = TestConfig::setup("list_nodes_filter_empty", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
let filter = vec![MetadataPredicate::Eq(
"admin".to_string(),
"not an admin".to_string(),
)];
let nodes = remote_registry
.list_nodes(&filter)
.expect("Failed to retrieve nodes")
.collect::<Vec<_>>();
assert!(nodes.is_empty());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn file_available_at_startup() {
let test_config = TestConfig::setup("file_available_at_startup", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn file_unavailable_at_startup() {
let test_config = TestConfig::setup("file_unavailable_at_startup", None);
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
test_config.update_registry(Some(mock_registry()));
verify_internal_cache(&test_config, &remote_registry, mock_registry());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn auto_refresh_disabled() {
let test_config = TestConfig::setup("auto_refresh_disabled", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
assert!(remote_registry.shutdown_handle().running.is_none());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn auto_refresh_enabled() {
let test_config = TestConfig::setup("auto_refresh_enabled", Some(mock_registry()));
let refresh_period = Duration::from_secs(1);
let remote_registry = RemoteYamlRegistry::new(
test_config.url(),
test_config.path(),
Some(refresh_period),
None,
)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
assert!(remote_registry.shutdown_handle().running.is_some());
test_config.update_registry(Some(vec![]));
std::thread::sleep(refresh_period * 2);
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn forced_refresh_disabled() {
let test_config = TestConfig::setup("forced_refresh_disabled", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
test_config.update_registry(Some(vec![]));
verify_internal_cache(&test_config, &remote_registry, mock_registry());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn forced_refresh_enabled() {
let test_config = TestConfig::setup("forced_refresh_enabled", Some(mock_registry()));
let refresh_period = Duration::from_millis(10);
let remote_registry = RemoteYamlRegistry::new(
test_config.url(),
test_config.path(),
None,
Some(refresh_period),
)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
test_config.update_registry(Some(vec![]));
std::thread::sleep(refresh_period);
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn restart_file_available() {
let test_config = TestConfig::setup("restart_file_available", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
remote_registry.shutdown_handle().shutdown();
test_config.update_registry(Some(vec![]));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, vec![]);
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
#[test]
fn restart_file_unavailable() {
let test_config = TestConfig::setup("restart_file_unavailable", Some(mock_registry()));
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
remote_registry.shutdown_handle().shutdown();
test_config.update_registry(None);
let remote_registry =
RemoteYamlRegistry::new(test_config.url(), test_config.path(), None, None)
.expect("Failed to create registry");
verify_internal_cache(&test_config, &remote_registry, mock_registry());
remote_registry.shutdown_handle().shutdown();
test_config.shutdown();
}
fn mock_registry() -> Vec<Node> {
vec![
Node::builder("Node-123")
.with_endpoint("tcps://12.0.0.123:8431")
.with_display_name("Bitwise IO - Node 1")
.with_key("abcd")
.with_metadata("company", "Bitwise IO")
.with_metadata("admin", "Bob")
.build()
.expect("Failed to build node1"),
Node::builder("Node-456")
.with_endpoint("tcps://12.0.0.123:8434")
.with_display_name("Cargill - Node 1")
.with_key("0123")
.with_metadata("company", "Cargill")
.with_metadata("admin", "Carol")
.build()
.expect("Failed to build node2"),
Node::builder("Node-789")
.with_endpoint("tcps://12.0.0.123:8435")
.with_display_name("Cargill - Node 2")
.with_key("4567")
.with_metadata("company", "Cargill")
.with_metadata("admin", "Charlie")
.build()
.expect("Failed to build node3"),
]
}
fn verify_internal_cache(
test_config: &TestConfig,
remote_registry: &RemoteYamlRegistry,
expected_registry: Vec<Node>,
) {
assert_eq!(
remote_registry.get_nodes().expect("Failed to get nodes"),
expected_registry,
);
let filename = compute_cache_filename(test_config.url(), test_config.path())
.expect("Failed to compute cache filename");
let file = File::open(filename).expect("Failed to open cache file");
let file_contents: Vec<Node> =
serde_yaml::from_reader(file).expect("Failed to deserialize cache file");
assert_eq!(file_contents, expected_registry);
}
struct TestConfig {
_temp_dir: TempDir,
temp_dir_path: String,
registry: Arc<Mutex<Option<Vec<Node>>>>,
registry_url: String,
rest_api_shutdown_handle: RestApiShutdownHandle,
rest_api_join_handle: std::thread::JoinHandle<()>,
}
impl TestConfig {
fn setup(test_name: &str, registry: Option<Vec<Node>>) -> Self {
let temp_dir = TempDir::new(test_name).expect("Failed to create temp dir");
let temp_dir_path = temp_dir
.path()
.to_str()
.expect("Failed to get path")
.to_string();
let registry = Arc::new(Mutex::new(registry));
let (rest_api_shutdown_handle, rest_api_join_handle, registry_url) =
serve_registry(registry.clone());
Self {
_temp_dir: temp_dir,
temp_dir_path,
registry,
registry_url,
rest_api_shutdown_handle,
rest_api_join_handle,
}
}
fn path(&self) -> &str {
&self.temp_dir_path
}
fn url(&self) -> &str {
&self.registry_url
}
fn update_registry(&self, registry: Option<Vec<Node>>) {
*self.registry.lock().expect("Registry lock poisonsed") = registry;
}
fn shutdown(self) {
self.rest_api_shutdown_handle
.shutdown()
.expect("Unable to shutdown rest api");
self.rest_api_join_handle
.join()
.expect("Unable to join rest api thread");
}
}
fn serve_registry(
registry: Arc<Mutex<Option<Vec<Node>>>>,
) -> (RestApiShutdownHandle, std::thread::JoinHandle<()>, String) {
let (shutdown, join, url) = run_rest_api_on_open_port(vec![Resource::build(
"/registry.yaml",
)
.add_method(Method::Get, move |_, _| {
Box::new(match &*registry.lock().expect("Registry lock poisoned") {
Some(registry) => HttpResponse::Ok()
.body(serde_yaml::to_vec(®istry).expect("Failed to serialize registry file"))
.into_future(),
None => HttpResponse::NotFound().finish().into_future(),
})
})]);
(shutdown, join, format!("http://{}/registry.yaml", url))
}
fn run_rest_api_on_open_port(
resources: Vec<Resource>,
) -> (RestApiShutdownHandle, std::thread::JoinHandle<()>, String) {
(10000..20000)
.find_map(|port| {
let bind_url = format!("127.0.0.1:{}", port);
let result = RestApiBuilder::new()
.with_bind(&bind_url)
.add_resources(resources.clone())
.build()
.expect("Failed to build REST API")
.run();
match result {
Ok((shutdown_handle, join_handle)) => {
Some((shutdown_handle, join_handle, bind_url))
}
Err(RestApiServerError::BindError(_)) => None,
Err(err) => panic!("Failed to run REST API: {}", err),
}
})
.expect("No port available")
}
}