mod cyclon;
mod module;
mod ring;
mod vicinity;
pub use self::cyclon::Cyclon;
pub use self::module::{FilterModule, Module};
pub use self::ring::Rings;
pub use self::vicinity::Vicinity;
use crate::{Id, Node, NodeData};
use std::collections::BTreeMap;
pub struct Topology {
our_node: Node,
known_nodes: BTreeMap<Id, NodeData>,
modules: BTreeMap<&'static str, Box<dyn Module + Send + Sync>>,
filter_modules: BTreeMap<&'static str, Box<dyn FilterModule + Send + Sync>>,
}
impl Topology {
pub fn new(our_node: Node) -> Self {
Topology {
our_node,
known_nodes: BTreeMap::new(),
modules: BTreeMap::new(),
filter_modules: BTreeMap::new(),
}
}
pub fn node(&self) -> &Node {
&self.our_node
}
pub fn node_mut(&mut self) -> &mut Node {
&mut self.our_node
}
pub fn default(our_node: Node) -> Self {
let mut topology = Topology::new(our_node);
topology.add_module(Rings::default());
topology.add_module(Vicinity::default());
topology.add_module(Cyclon::default());
topology
}
#[inline]
pub fn add_module<M: Module + Send + Sync + 'static>(&mut self, module: M) {
let name = module.name();
self.modules.insert(name, Box::new(module));
}
#[inline]
pub fn add_filter_module<FM: FilterModule + Send + Sync + 'static>(
&mut self,
filter_module: FM,
) {
let name = filter_module.name();
self.filter_modules.insert(name, Box::new(filter_module));
}
pub fn view(&self) -> Vec<NodeData> {
let mut view = BTreeMap::new();
for module in self.modules.values() {
module.view(&self.known_nodes, &mut view)
}
view.into_iter().map(|v| v.1).collect()
}
fn filter_nodes(&self, mut nodes: BTreeMap<Id, NodeData>) -> BTreeMap<Id, NodeData> {
for filter in self.filter_modules.values() {
nodes = filter.filter(&self.our_node.data(), nodes);
}
nodes
}
pub fn update(&mut self, new_nodes: BTreeMap<Id, NodeData>) {
let filtered_nodes = self.filter_nodes(new_nodes);
self.our_node
.data_mut()
.subscribers
.extend(filtered_nodes.keys());
self.known_nodes.extend(filtered_nodes);
for module in self.modules.values_mut() {
module.update(self.our_node.data(), &self.known_nodes);
}
}
pub fn evict_node(&mut self, id: Id) -> Option<NodeData> {
if let Some(node) = self.known_nodes.remove(&id) {
let known_nodes = std::mem::replace(&mut self.known_nodes, BTreeMap::new());
self.update(known_nodes);
Some(node)
} else {
None
}
}
pub fn select_gossips(&mut self, gossip_recipient: &NodeData) -> BTreeMap<Id, NodeData> {
let mut gossips = BTreeMap::new();
self.our_node.data_mut().last_gossip = std::time::SystemTime::now();
let known_nodes = self
.known_nodes
.iter()
.filter(|node| node.1.address().is_some())
.map(|(id, node)| (*id, node.clone()))
.collect();
for module in self.modules.values() {
gossips.extend(module.select_gossips(
self.our_node.data(),
gossip_recipient,
&known_nodes,
));
}
gossips.remove(gossip_recipient.id());
gossips.insert(*self.our_node.data().id(), self.our_node.data().clone());
gossips
}
}