use std::collections::{BTreeMap, BTreeSet};
use crate::topology::Module;
use crate::{Id, InterestLevel, NodeData, Topic};
pub const RINGS_MAX_VIEW_SIZE: usize = 4;
pub const RINGS_NEIGHBOR_PREDECESSOR_SIZE: usize = RINGS_MAX_VIEW_SIZE / 2;
pub const RINGS_NEIGHBOR_SUCCESSOR_SIZE: usize = RINGS_MAX_VIEW_SIZE / 2;
#[derive(Clone, Debug)]
pub struct Rings {
pub(crate) neighbors: BTreeMap<Topic, TopicView>,
}
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub(crate) enum Slot<A> {
Taken(A),
Available,
}
#[derive(Clone, Debug)]
pub struct TopicView([Slot<Id>; RINGS_MAX_VIEW_SIZE]);
impl Module for Rings {
fn name(&self) -> &'static str {
"rings"
}
fn select_gossips(
&self,
our_node: &NodeData,
gossip_recipient: &NodeData,
known_nodes: &BTreeMap<Id, NodeData>,
) -> BTreeMap<Id, NodeData> {
self.select_nodes_to_send(our_node, gossip_recipient, known_nodes)
}
fn update(&mut self, self_node: &NodeData, known_nodes: &BTreeMap<Id, NodeData>) {
self.update_view(self_node, known_nodes)
}
fn view(&self, known_nodes: &BTreeMap<Id, NodeData>, view: &mut BTreeMap<Id, NodeData>) {
for neighborhood in self.neighbors.values() {
for slot in neighborhood.iter() {
if let Slot::Taken(id) = slot {
if let Some(node) = known_nodes.get(id) {
view.insert(*id, node.clone());
} else {
unreachable!()
}
}
}
}
}
}
impl<A> Slot<A> {
pub(crate) fn is_taken(&self) -> bool {
match self {
Slot::Taken(_) => true,
Slot::Available => false,
}
}
pub(crate) fn option(&self) -> Option<&A> {
match self {
Slot::Taken(a) => Some(a),
Slot::Available => None,
}
}
}
impl Default for TopicView {
fn default() -> Self {
TopicView([Slot::Available; RINGS_MAX_VIEW_SIZE])
}
}
impl Default for Rings {
fn default() -> Self {
Rings {
neighbors: BTreeMap::default(),
}
}
}
impl TopicView {
pub(crate) fn degree(&self) -> usize {
self.0.iter().filter(|v| v.is_taken()).count()
}
pub(crate) fn remove_node(&mut self, id: Id) -> bool {
let id = Slot::Taken(id);
for slot in self.0.iter_mut() {
if slot == &id {
*slot = Slot::Available;
return true;
}
}
false
}
pub(crate) fn contains(&self, id: Id) -> bool {
let id = Slot::Taken(id);
self.0.iter().any(|v| v == &id)
}
#[cfg(test)]
fn successors(&self) -> impl Iterator<Item = &Slot<Id>> {
self.0
.iter()
.skip(RINGS_NEIGHBOR_PREDECESSOR_SIZE)
.take(RINGS_NEIGHBOR_SUCCESSOR_SIZE)
}
pub(crate) fn successors_mut(&mut self) -> impl Iterator<Item = &mut Slot<Id>> {
self.0
.iter_mut()
.skip(RINGS_NEIGHBOR_PREDECESSOR_SIZE)
.take(RINGS_NEIGHBOR_SUCCESSOR_SIZE)
}
#[cfg(test)]
fn predecessors(&self) -> impl Iterator<Item = &Slot<Id>> {
self.0
.iter()
.rev()
.skip(RINGS_NEIGHBOR_SUCCESSOR_SIZE)
.take(RINGS_NEIGHBOR_PREDECESSOR_SIZE)
}
pub(crate) fn predecessors_mut(&mut self) -> impl Iterator<Item = &mut Slot<Id>> {
self.0
.iter_mut()
.rev()
.skip(RINGS_NEIGHBOR_SUCCESSOR_SIZE)
.take(RINGS_NEIGHBOR_PREDECESSOR_SIZE)
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &Slot<Id>> {
self.0.iter()
}
#[cfg(test)]
fn iter_mut<'a>(&'a mut self) -> impl Iterator<Item = &'a mut Slot<Id>> {
self.0.iter_mut()
}
}
impl Rings {
pub fn update_priorities(&mut self, self_node: &mut NodeData) {
for (k, v) in self_node.subscriptions.iter_mut() {
let degree = self
.neighbors
.entry(*k)
.or_insert_with(TopicView::default)
.degree();
*v = match RINGS_MAX_VIEW_SIZE - degree {
0 => InterestLevel::Low,
1 => InterestLevel::Normal,
_ => InterestLevel::High,
};
}
}
pub fn remove_node(&mut self, id: Id) -> bool {
self.neighbors
.iter_mut()
.fold(false, |acc, (_, v)| acc || v.remove_node(id))
}
pub fn contains(&self, id: Id) -> bool {
self.neighbors.iter().any(|(_, v)| v.contains(id))
}
pub fn degree(&self) -> usize {
self.neighbors.iter().map(|(_, v)| v.degree()).sum()
}
fn update_view(&mut self, self_node: &NodeData, known_nodes: &BTreeMap<Id, NodeData>) {
self.neighbors = BTreeMap::new();
for topic in self_node.subscriptions.topics() {
let view = select_best_nodes_for_topic(*self_node.id(), *topic, known_nodes);
self.neighbors.insert(*topic, view);
}
}
fn select_nodes_to_send(
&self,
self_node: &NodeData,
gossip_node: &NodeData,
known_nodes: &BTreeMap<Id, NodeData>,
) -> BTreeMap<Id, NodeData> {
let common_topics: BTreeSet<Topic> = self_node
.common_subscriptions(gossip_node)
.cloned()
.collect();
let common_subscribers: BTreeSet<Id> = self_node
.common_subscribers(&gossip_node)
.cloned()
.collect();
let candidates: BTreeMap<Id, NodeData> = known_nodes
.iter()
.filter(|(k, v)| {
common_subscribers.contains(k)
&& v.subscriptions.topics().any(|k| common_topics.contains(k))
})
.map(|(k, v)| (*k, v.clone()))
.collect();
let mut nodes = BTreeMap::new();
for topic in common_topics {
let view = select_best_nodes_for_topic(*gossip_node.id(), topic, &candidates);
for candidate in view.iter().filter_map(|v| v.option()) {
if let Some(node) = candidates.get(candidate) {
nodes.insert(*candidate, node.clone());
}
}
}
nodes
}
}
fn select_best_nodes_for_topic(
other_id: Id,
topic: Topic,
candidates: &BTreeMap<Id, NodeData>,
) -> TopicView {
use std::ops::Bound::{self, Excluded, Included};
let mut view = TopicView::default();
{
let mut predecessor = view.predecessors_mut();
for (id, candidate) in candidates
.range((Included(Id::zero()), Excluded(other_id)))
.rev()
{
if candidate.subscriptions.contains(topic) {
if let Some(p) = predecessor.next() {
*p = Slot::Taken(*id);
} else {
break;
}
}
}
}
{
let mut successor = view.successors_mut();
for (id, candidate) in candidates
.range((Excluded(other_id), Bound::Unbounded))
.rev()
{
if candidate.subscriptions.contains(topic) {
if let Some(p) = successor.next() {
*p = Slot::Taken(*id);
} else {
break;
}
}
}
}
view
}
#[cfg(test)]
mod test {
use super::*;
use quickcheck::{Arbitrary, Gen};
impl<A: Arbitrary> Arbitrary for Slot<A> {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
match Arbitrary::arbitrary(g) {
Some(v) => Slot::Taken(v),
None => Slot::Available,
}
}
}
impl Arbitrary for TopicView {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let mut view = TopicView::default();
for v in view.iter_mut() {
*v = Arbitrary::arbitrary(g);
}
view
}
}
quickcheck! {
fn predecessors_and_successors_is_all(view: TopicView) -> bool {
let all = view.iter().cloned();
let mut predecessors : Vec<_> = view.predecessors().cloned().collect();
predecessors.reverse();
let chained = predecessors.into_iter().chain(view.successors().cloned());
all.zip(chained).all(|(k, v)| k == v)
}
}
}