use std::sync::Arc;
use super::{
MetadataPredicate, Node, NodeRegistryError, NodeRegistryReader, NodeRegistryWriter,
RwNodeRegistry,
};
#[derive(Clone)]
pub struct UnifiedNodeRegistry {
local_source: Arc<dyn RwNodeRegistry>,
readable_sources: Vec<Arc<dyn NodeRegistryReader>>,
}
impl UnifiedNodeRegistry {
pub fn new(
local_source: Box<dyn RwNodeRegistry>,
readable_sources: Vec<Box<dyn NodeRegistryReader>>,
) -> Self {
Self {
local_source: local_source.into(),
readable_sources: readable_sources.into_iter().map(Arc::from).collect(),
}
}
}
type NodeIter<'a> = Box<dyn Iterator<Item = Node> + Send + 'a>;
impl NodeRegistryReader for UnifiedNodeRegistry {
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<NodeIter<'a>, NodeRegistryError> {
self.readable_sources
.iter()
.map(|registry| registry.list_nodes(predicates))
.fold(self.local_source.list_nodes(predicates), |acc, iter| {
let local_source = self.local_source.clone();
acc.and_then(|chained| {
let res: NodeIter<'a> = Box::new(chained.chain(iter?.filter(move |node| {
match local_source.has_node(&node.identity) {
Ok(exists) => !exists,
Err(err) => {
error!(
"unable to load local entry for {}; using read-only copy: {}",
node.identity, err
);
false
}
}
})));
Ok(res)
})
})
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, NodeRegistryError> {
let local_source_count = self.local_source.count_nodes(predicates)?;
self.readable_sources
.iter()
.map(|source| source.count_nodes(predicates))
.fold(Ok(local_source_count), |acc, count| {
acc.and_then(|total| Ok(total + count?))
})
}
fn fetch_node(&self, identity: &str) -> Result<Node, NodeRegistryError> {
match self.local_source.fetch_node(identity) {
Ok(node) => return Ok(node),
Err(NodeRegistryError::NotFoundError(_)) => (),
Err(err) => return Err(err),
}
self.readable_sources
.iter()
.map(|source| source.fetch_node(identity))
.filter(|res| match res {
Err(NodeRegistryError::NotFoundError(_)) => false,
_ => true,
})
.find(Result::is_ok)
.unwrap_or_else(|| Err(NodeRegistryError::NotFoundError(identity.to_string())))
}
}
impl NodeRegistryWriter for UnifiedNodeRegistry {
fn insert_node(&self, node: Node) -> Result<(), NodeRegistryError> {
self.local_source.insert_node(node)
}
fn delete_node(&self, identity: &str) -> Result<(), NodeRegistryError> {
self.local_source.delete_node(identity)
}
}
impl RwNodeRegistry for UnifiedNodeRegistry {
fn clone_box(&self) -> Box<dyn RwNodeRegistry> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use super::*;
macro_rules! node {
($identity:expr, $($key:expr => $val:expr),*) => {
{
let mut node = Node::new($identity, "test://example.com");
$(
node.metadata.insert($key.into(), $val.into());
)*
node
}
};
}
#[test]
fn test_unified_fetch_node_readable() {
let readable = MemRegistry::default();
readable
.insert_node(node!("node1", "meta_a" => "a value"))
.expect("Unable to insert node");
let writable = MemRegistry::default();
let unified = UnifiedNodeRegistry::new(Box::new(writable), vec![Box::new(readable)]);
let retreived_node = unified.fetch_node("node1").expect("Unable to fetch node");
assert_eq!(node!("node1", "meta_a" => "a value"), retreived_node);
}
#[test]
fn test_unified_fetch_node_local() {
let readable = MemRegistry::default();
let writable = MemRegistry::default();
writable
.insert_node(node!("node1", "meta_b" => "b value"))
.expect("Unable to insert node");
let unified = UnifiedNodeRegistry::new(Box::new(writable), vec![Box::new(readable)]);
let retreived_node = unified.fetch_node("node1").expect("Unable to fetch node");
assert_eq!(node!("node1", "meta_b" => "b value"), retreived_node);
}
#[test]
fn test_unified_fetch_node_local_selected() {
let readable = MemRegistry::default();
readable
.insert_node(node!("node1", "meta_a" => "a value"))
.expect("Unable to insert node");
let writable = MemRegistry::default();
writable
.insert_node(node!("node1", "meta_b" => "b value"))
.expect("Unable to insert node");
let unified = UnifiedNodeRegistry::new(Box::new(writable), vec![Box::new(readable)]);
let retreived_node = unified.fetch_node("node1").expect("Unable to fetch node");
assert_eq!(node!("node1", "meta_b" => "b value"), retreived_node);
}
#[test]
fn test_unified_iteration_local_selected() {
let readable = MemRegistry::default();
readable
.insert_node(node!("node1", "meta_a" => "a value"))
.expect("Unable to insert node");
readable
.insert_node(node!("node2", "meta_c" => "c value"))
.expect("Unable to insert node");
let writable = MemRegistry::default();
writable
.insert_node(node!("node1", "meta_b" => "b value"))
.expect("Unable to insert node");
let unified = UnifiedNodeRegistry::new(Box::new(writable), vec![Box::new(readable)]);
let mut iterator = unified.list_nodes(&[]).expect("Unable to list nodes");
assert_eq!(Some(node!("node1", "meta_b" => "b value")), iterator.next());
assert_eq!(Some(node!("node2", "meta_c" => "c value")), iterator.next());
assert_eq!(None, iterator.next());
}
#[test]
fn test_unified_iteration_filtering() {
let readable = MemRegistry::default();
readable
.insert_node(node!("node1", "meta_a" => "a value"))
.expect("Unable to insert node");
readable
.insert_node(node!("node2", "meta_c" => "c value"))
.expect("Unable to insert node");
let writable = MemRegistry::default();
writable
.insert_node(node!("node1", "meta_b" => "b value"))
.expect("Unable to insert node");
let unified = UnifiedNodeRegistry::new(Box::new(writable), vec![Box::new(readable)]);
let predicates = vec![MetadataPredicate::eq("meta_b", "b value")];
let mut iterator = unified
.list_nodes(&predicates)
.expect("Unable to list nodes");
assert_eq!(Some(node!("node1", "meta_b" => "b value")), iterator.next());
assert_eq!(None, iterator.next());
let predicates = vec![MetadataPredicate::ne("meta_b", "b value")];
let mut iterator = unified
.list_nodes(&predicates)
.expect("Unable to list nodes");
assert_eq!(Some(node!("node2", "meta_c" => "c value")), iterator.next());
assert_eq!(None, iterator.next());
}
#[derive(Clone, Default)]
struct MemRegistry {
nodes: Arc<Mutex<BTreeMap<String, Node>>>,
}
impl NodeRegistryReader for MemRegistry {
fn list_nodes<'a, 'b: 'a>(
&'b self,
predicates: &'a [MetadataPredicate],
) -> Result<Box<dyn Iterator<Item = Node> + Send + 'a>, NodeRegistryError> {
Ok(Box::new(SnapShotIter {
snapshot: self
.nodes
.lock()
.expect("mem registry lock was poisoned")
.iter()
.map(|(_, node)| node)
.filter(move |node| predicates.iter().all(|predicate| predicate.apply(node)))
.cloned()
.collect(),
}))
}
fn count_nodes(&self, predicates: &[MetadataPredicate]) -> Result<u32, NodeRegistryError> {
self.list_nodes(predicates).map(|iter| iter.count() as u32)
}
fn fetch_node(&self, identity: &str) -> Result<Node, NodeRegistryError> {
self.nodes
.lock()
.expect("mem registry lock was poisoned")
.get(identity)
.cloned()
.ok_or_else(|| NodeRegistryError::NotFoundError(identity.to_string()))
}
}
impl NodeRegistryWriter for MemRegistry {
fn insert_node(&self, node: Node) -> Result<(), NodeRegistryError> {
self.nodes
.lock()
.expect("mem registry lock was poisoned")
.insert(node.identity.clone(), node);
Ok(())
}
fn delete_node(&self, identity: &str) -> Result<(), NodeRegistryError> {
self.nodes
.lock()
.expect("mem registry lock was poisoned")
.remove(identity);
Ok(())
}
}
impl RwNodeRegistry for MemRegistry {
fn clone_box(&self) -> Box<dyn RwNodeRegistry> {
Box::new(self.clone())
}
}
struct SnapShotIter<V: Send + Clone> {
snapshot: std::collections::VecDeque<V>,
}
impl<V: Send + Clone> Iterator for SnapShotIter<V> {
type Item = V;
fn next(&mut self) -> Option<Self::Item> {
self.snapshot.pop_front()
}
}
}