use std::collections::HashMap;
use std::sync::Arc;
use super::{
MetadataPredicate, Node, NodeIter, RegistryError, RegistryReader, RegistryWriter, RwRegistry,
};
#[derive(Clone)]
pub struct UnifiedRegistry {
internal_source: Arc<dyn RwRegistry>,
external_sources: Vec<Arc<dyn RegistryReader>>,
}
impl UnifiedRegistry {
pub fn new(
internal_source: Box<dyn RwRegistry>,
external_sources: Vec<Box<dyn RegistryReader>>,
) -> Self {
Self {
internal_source: internal_source.into(),
external_sources: external_sources.into_iter().map(Arc::from).collect(),
}
}
fn all_nodes<'a>(&'a self) -> Box<dyn Iterator<Item = Node> + 'a> {
Box::new(
self.external_sources
.iter()
.map(|registry| registry.list_nodes(&[]))
.rev()
.chain(std::iter::once(self.internal_source.list_nodes(&[])))
.filter_map(|res| {
res.map_err(|err| debug!("Failed to list nodes in source registry: {}", err))
.ok()
})
.flatten(),
)
}
}
impl RegistryReader for UnifiedRegistry {
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<NodeIter<'a>, RegistryError> {
let mut id_map = self
.all_nodes()
.fold(HashMap::<String, Node>::new(), |mut acc, mut node| {
if let Some(existing) = acc.remove(&node.identity) {
let mut merged_metadata = existing.metadata;
merged_metadata.extend(node.metadata);
node.metadata = merged_metadata;
}
acc.insert(node.identity.clone(), node);
acc
});
id_map.retain(|_, node| predicates.iter().all(|predicate| predicate.apply(node)));
Ok(Box::new(id_map.into_iter().map(|(_, node)| node)))
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, RegistryError> {
self.list_nodes(predicates).map(|iter| iter.count() as u32)
}
fn get_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.external_sources
.iter()
.map(|registry| registry.get_node(identity))
.rev()
.chain(std::iter::once(self.internal_source.get_node(identity)))
.filter_map(|res| {
res.map_err(|err| debug!("Failed to fetch node from source registry: {}", err))
.ok()
})
.fold(None, |final_opt, fetch_opt| {
match fetch_opt {
Some(mut node) => {
if let Some(existing) = final_opt {
let mut merged_metadata = existing.metadata;
merged_metadata.extend(node.metadata);
node.metadata = merged_metadata;
}
Some(node)
}
None => final_opt,
}
}))
}
fn has_node(&self, identity: &str) -> Result<bool, RegistryError> {
Ok(self
.internal_source
.has_node(identity)
.unwrap_or_else(|err| {
debug!(
"Failed to check for existence of node in source registry: {}",
err
);
false
})
|| self.external_sources.iter().any(|source| {
source.has_node(identity).unwrap_or_else(|err| {
debug!(
"Failed to check for existence of node in source registry: {}",
err
);
false
})
}))
}
}
impl RegistryWriter for UnifiedRegistry {
fn add_node(&self, node: Node) -> Result<(), RegistryError> {
self.internal_source.add_node(node)
}
fn update_node(&self, node: Node) -> Result<(), RegistryError> {
self.internal_source.update_node(node)
}
fn delete_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
self.internal_source.delete_node(identity)
}
}
impl RwRegistry for UnifiedRegistry {
fn clone_box(&self) -> Box<dyn RwRegistry> {
Box::new(self.clone())
}
fn clone_box_as_reader(&self) -> Box<dyn RegistryReader> {
Box::new(self.clone())
}
fn clone_box_as_writer(&self) -> Box<dyn RegistryWriter> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::{Arc, Mutex};
use super::*;
use crate::error::InvalidStateError;
fn new_node(id: &str, endpoint: &str, metadata: &[(&str, &str)]) -> Node {
let mut builder = Node::builder(id).with_endpoint(endpoint).with_key("abcd");
for (key, val) in metadata {
builder = builder.with_metadata(*key, *val);
}
builder.build().expect("Failed to build node")
}
#[test]
fn node_count_empty() {
let unified = UnifiedRegistry::new(
Box::new(MemRegistry::default()),
vec![Box::new(MemRegistry::default())],
);
assert_eq!(0, unified.count_nodes(&[]).expect("Unable to get count"));
}
#[test]
fn node_count_multiple() {
let node1 = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let node2 = new_node("node2", "endpoint2", &[("meta_b", "val_b")]);
let node3 = new_node("node1", "endpoint3", &[("meta_c", "val_c")]);
let writeable = MemRegistry::default();
writeable.add_node(node1).expect("Unable to insert node1");
writeable.add_node(node2).expect("Unable to insert node2");
let readable = MemRegistry::default();
writeable.add_node(node3).expect("Unable to insert node3");
let unified = UnifiedRegistry::new(Box::new(writeable), vec![Box::new(readable)]);
assert_eq!(2, unified.count_nodes(&[]).expect("Unable to get count"));
}
#[test]
fn node_count_with_predicates() {
let node1 = new_node(
"node1",
"endpoint1",
&[("meta_a", "val_a"), ("meta_b", "val_b")],
);
let node2 = new_node(
"node2",
"endpoint2",
&[("meta_a", "val_c"), ("meta_b", "val_b")],
);
let node3 = new_node(
"node1",
"endpoint3",
&[("meta_a", "val_a"), ("meta_b", "val_c")],
);
let writeable = MemRegistry::default();
writeable.add_node(node1).expect("Unable to insert node1");
writeable.add_node(node2).expect("Unable to insert node2");
let readable = MemRegistry::default();
readable.add_node(node3).expect("Unable to insert node3");
let unified = UnifiedRegistry::new(Box::new(writeable), vec![Box::new(readable)]);
assert_eq!(
1,
unified
.count_nodes(&[
MetadataPredicate::eq("meta_a", "val_a"),
MetadataPredicate::ne("meta_b", "val_c")
])
.expect("Unable to get count")
);
}
#[test]
fn get_node_read_only() {
let node = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let readable = MemRegistry::default();
readable
.add_node(node.clone())
.expect("Unable to insert node");
let unified =
UnifiedRegistry::new(Box::new(MemRegistry::default()), vec![Box::new(readable)]);
let retreived_node = unified
.get_node("node1")
.expect("Unable to fetch node")
.expect("Node not found");
assert_eq!(node, retreived_node);
}
#[test]
fn get_node_internal() {
let node = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let writable = MemRegistry::default();
writable
.add_node(node.clone())
.expect("Unable to insert node");
let unified =
UnifiedRegistry::new(Box::new(writable), vec![Box::new(MemRegistry::default())]);
let retreived_node = unified
.get_node("node1")
.expect("Unable to fetch node")
.expect("Node not found");
assert_eq!(node, retreived_node);
}
#[test]
fn get_node_read_only_precedence() {
let high_precedence_node = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let med_precedence_node = new_node("node1", "endpoint2", &[("meta_b", "val_b")]);
let low_precedence_node = new_node("node1", "endpoint3", &[("meta_a", "val_c")]);
let expected_node = new_node(
"node1",
"endpoint1",
&[("meta_a", "val_a"), ("meta_b", "val_b")],
);
let high_precedence_readable = MemRegistry::default();
high_precedence_readable
.add_node(high_precedence_node)
.expect("Unable to insert high-precedence node");
let med_precedence_readable = MemRegistry::default();
med_precedence_readable
.add_node(med_precedence_node)
.expect("Unable to insert medium-precedence node");
let low_precedence_readable = MemRegistry::default();
low_precedence_readable
.add_node(low_precedence_node)
.expect("Unable to insert low-precedence node");
let unified = UnifiedRegistry::new(
Box::new(MemRegistry::default()),
vec![
Box::new(high_precedence_readable),
Box::new(med_precedence_readable),
Box::new(low_precedence_readable),
],
);
let retreived_node = unified
.get_node("node1")
.expect("Unable to fetch node")
.expect("Node not found");
assert_eq!(expected_node, retreived_node);
}
#[test]
fn get_node_internal_precedence() {
let high_precedence_node = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let med_precedence_node = new_node("node1", "endpoint2", &[("meta_b", "val_b")]);
let low_precedence_node = new_node("node1", "endpoint3", &[("meta_a", "val_c")]);
let expected_node = new_node(
"node1",
"endpoint1",
&[("meta_a", "val_a"), ("meta_b", "val_b")],
);
let writable = MemRegistry::default();
writable
.add_node(high_precedence_node)
.expect("Unable to insert high-precedence node");
let med_precedence_readable = MemRegistry::default();
med_precedence_readable
.add_node(med_precedence_node)
.expect("Unable to insert medium-precedence node");
let low_precedence_readable = MemRegistry::default();
low_precedence_readable
.add_node(low_precedence_node)
.expect("Unable to insert low-precedence node");
let unified = UnifiedRegistry::new(
Box::new(writable),
vec![
Box::new(med_precedence_readable),
Box::new(low_precedence_readable),
],
);
let retreived_node = unified
.get_node("node1")
.expect("Unable to fetch node")
.expect("Node not found");
assert_eq!(expected_node, retreived_node);
}
#[test]
fn has_node() {
let node1 = new_node("node1", "endpoint1", &[]);
let node2 = new_node("node2", "endpoint2", &[]);
let writable = MemRegistry::default();
writable
.add_node(node1.clone())
.expect("Unable to insert node");
let readable = MemRegistry::default();
readable
.add_node(node2.clone())
.expect("Unable to insert node");
let unified = UnifiedRegistry::new(Box::new(writable), vec![Box::new(readable)]);
assert!(unified
.has_node(&node1.identity)
.expect("Failed to check if node1 exists"));
assert!(unified
.has_node(&node2.identity)
.expect("Failed to check if node2 exists"));
assert!(!unified
.has_node("NodeNotInRegistry")
.expect("Failed to check for non-existent node"));
}
#[test]
fn list_nodes_precedence() {
let node1_internal = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let node1_read_only = new_node(
"node1",
"endpoint3",
&[("meta_a", "val_c"), ("meta_b", "val_b")],
);
let node2_high = new_node("node2", "endpoint2", &[("meta_a", "val_a")]);
let node2_low = new_node(
"node2",
"endpoint3",
&[("meta_a", "val_c"), ("meta_b", "val_b")],
);
let expected_nodes = HashMap::from_iter(vec![
(
"node1".to_string(),
new_node(
"node1",
"endpoint1",
&[("meta_a", "val_a"), ("meta_b", "val_b")],
),
),
(
"node2".to_string(),
new_node(
"node2",
"endpoint2",
&[("meta_a", "val_a"), ("meta_b", "val_b")],
),
),
]);
let writable = MemRegistry::default();
writable
.add_node(node1_internal)
.expect("Unable to insert internal node1");
let readable_high = MemRegistry::default();
readable_high
.add_node(node1_read_only)
.expect("Unable to insert read-only node1");
readable_high
.add_node(node2_high)
.expect("Unable to insert high-precedence node2");
let readable_low = MemRegistry::default();
readable_low
.add_node(node2_low)
.expect("Unable to insert low-precedence node2");
let unified = UnifiedRegistry::new(
Box::new(writable),
vec![Box::new(readable_high), Box::new(readable_low)],
);
let nodes = unified
.list_nodes(&[])
.expect("Unable to list nodes")
.map(|node| (node.identity.clone(), node))
.collect::<HashMap<_, _>>();
assert_eq!(expected_nodes, nodes);
}
#[test]
fn list_nodes_with_predicates() {
let node1 = new_node(
"node1",
"endpoint1",
&[("meta_a", "val_a"), ("meta_b", "val_b")],
);
let node2 = new_node(
"node2",
"endpoint2",
&[("meta_a", "val_c"), ("meta_b", "val_b")],
);
let node3 = new_node(
"node1",
"endpoint3",
&[("meta_a", "val_a"), ("meta_b", "val_c")],
);
let writeable = MemRegistry::default();
writeable
.add_node(node1.clone())
.expect("Unable to insert node1");
writeable.add_node(node2).expect("Unable to insert node2");
let readable = MemRegistry::default();
readable.add_node(node3).expect("Unable to insert node3");
let unified = UnifiedRegistry::new(Box::new(writeable), vec![Box::new(readable)]);
let predicates = vec![
MetadataPredicate::eq("meta_a", "val_a"),
MetadataPredicate::ne("meta_b", "val_c"),
];
let mut nodes = unified
.list_nodes(&predicates)
.expect("Unable to get count");
assert_eq!(Some(node1), nodes.next());
assert_eq!(None, nodes.next());
}
#[test]
fn write_nodes() {
let node1 = new_node("node1", "endpoint1", &[("meta_a", "val_a")]);
let node2 = new_node("node2", "endpoint2", &[("meta_b", "val_b")]);
let writeable = MemRegistry::default();
let readable = MemRegistry::default();
readable
.add_node(node2.clone())
.expect("Unable to insert node2 into read-only registry");
let unified = UnifiedRegistry::new(
Box::new(writeable.clone()),
vec![Box::new(readable.clone())],
);
unified
.add_node(node1.clone())
.expect("Unable to add node1");
assert!(unified
.has_node(&node1.identity)
.expect("Unable to check unified for node1"));
assert!(writeable
.has_node(&node1.identity)
.expect("Unable to check writeable for node1"));
assert!(!readable
.has_node(&node1.identity)
.expect("Unable to check readable for node1"));
assert!(unified
.delete_node(&node2.identity)
.expect("Unable to remove node2")
.is_none());
assert!(unified
.has_node(&node2.identity)
.expect("Unable to check unified for node2"));
assert!(readable
.has_node(&node2.identity)
.expect("Unable to check readable for node2"));
assert_eq!(
Some(node1.clone()),
unified
.delete_node(&node1.identity)
.expect("Unable to remove node1")
);
assert!(!unified
.has_node(&node1.identity)
.expect("Unable to check unified for node1"));
assert!(!writeable
.has_node(&node1.identity)
.expect("Unable to check writeable for node1"));
}
#[derive(Clone, Default)]
struct MemRegistry {
nodes: Arc<Mutex<HashMap<String, Node>>>,
}
impl RegistryReader for MemRegistry {
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<NodeIter<'a>, RegistryError> {
let mut nodes = self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.clone();
nodes.retain(|_, node| predicates.iter().all(|predicate| predicate.apply(node)));
Ok(Box::new(nodes.into_iter().map(|(_, node)| node)))
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, RegistryError> {
self.list_nodes(predicates).map(|iter| iter.count() as u32)
}
fn get_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.get(identity)
.cloned())
}
}
impl RegistryWriter for MemRegistry {
fn add_node(&self, node: Node) -> Result<(), RegistryError> {
self.nodes
.lock()
.expect("mem registry lock was poisoned")
.insert(node.identity.clone(), node);
Ok(())
}
fn update_node(&self, node: Node) -> Result<(), RegistryError> {
let mut inner = self.nodes.lock().expect("mem registry lock was poisoned");
if inner.contains_key(&node.identity) {
inner.insert(node.identity.clone(), node);
Ok(())
} else {
Err(RegistryError::InvalidStateError(
InvalidStateError::with_message(format!(
"Node does not exist in the registry: {}",
node.identity
)),
))
}
}
fn delete_node(&self, identity: &str) -> Result<Option<Node>, RegistryError> {
Ok(self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.remove(identity))
}
}
impl RwRegistry for MemRegistry {
fn clone_box(&self) -> Box<dyn RwRegistry> {
Box::new(self.clone())
}
fn clone_box_as_reader(&self) -> Box<dyn RegistryReader> {
Box::new(self.clone())
}
fn clone_box_as_writer(&self) -> Box<dyn RegistryWriter> {
Box::new(self.clone())
}
}
}