1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use crate::nodes::Entry;
use crate::{
    DefaultPolicy, Gossips, GossipsBuilder, Id, Layer, Node, NodeInfo, NodeProfile, Nodes, Policy,
    PolicyReport, Selection, ViewBuilder,
};

pub struct Topology {
    /// The local node identity
    profile: NodeProfile,

    nodes: Nodes,

    layers: Vec<Box<dyn Layer + Send + Sync>>,

    policy: Box<dyn Policy + Send + Sync>,
}

impl Topology {
    pub fn new(profile: NodeProfile) -> Self {
        Self {
            profile,
            nodes: Nodes::default(),
            layers: Vec::default(),
            policy: Box::new(DefaultPolicy::default()),
        }
    }

    pub fn profile(&self) -> &NodeProfile {
        &self.profile
    }

    pub fn add_layer<L>(&mut self, layer: L)
    where
        L: Layer + Send + Sync + 'static,
    {
        self.layers.push(Box::new(layer));
    }

    pub fn set_policy<P>(&mut self, policy: P)
    where
        P: Policy + Send + Sync + 'static,
    {
        self.policy = Box::new(policy);
    }

    pub fn view(&mut self, from: Option<Id>, selection: Selection) -> Vec<NodeInfo> {
        let mut view_builder = ViewBuilder::new(selection);

        if let Some(from) = from {
            view_builder.with_origin(from);
        }

        for layer in self.layers.iter_mut() {
            layer.view(&mut view_builder, &mut self.nodes)
        }

        view_builder.build(&self.nodes)
    }

    fn update_known_nodes(&mut self, _from: Id, gossips: Gossips) {
        for gossip in gossips.into_iter() {
            if gossip.id() == self.profile.id() {
                // ignore ourselves
                continue;
            }

            if let (Some(my_address), Some(other_address)) =
                (self.profile().address(), gossip.address())
            {
                if my_address == other_address {
                    // address theft or we have a new Id since then
                    continue;
                }
            }

            match self.nodes.entry(*gossip.id()) {
                Entry::Occupied(mut occupied) => {
                    occupied.modify(&mut self.policy, |node| node.update_gossip(gossip));
                }
                Entry::Vacant(mut vacant) => {
                    vacant.insert(Node::new(gossip));
                }
            }
        }
    }

    pub fn initiate_gossips(&mut self, with: Id) -> Gossips {
        if let Some(with) = self.nodes.get_mut(&with) {
            with.logs_mut().gossiping();
        }
        let mut gossips_builder = GossipsBuilder::new(with);

        for layer in self.layers.iter_mut() {
            layer.gossips(&self.profile, &mut gossips_builder, &self.nodes)
        }

        gossips_builder.build(self.profile.clone(), &self.nodes)
    }

    pub fn accept_gossips(&mut self, from: Id, gossips: Gossips) {
        if let Some(from) = self.nodes.get_mut(&from) {
            from.logs_mut().gossiping();
        }

        self.update_known_nodes(from, gossips);

        for layer in self.layers.iter_mut() {
            layer.reset();
            layer.populate(&self.profile, &self.nodes);
        }
    }

    pub fn exchange_gossips(&mut self, with: Id, gossips: Gossips) -> Gossips {
        if let Some(with) = self.nodes.get_mut(&with) {
            with.logs_mut().gossiping();
        }

        self.update_known_nodes(with, gossips);

        let mut gossips_builder = GossipsBuilder::new(with);

        for layer in self.layers.iter_mut() {
            layer.reset();
            layer.populate(&self.profile, &self.nodes);
            layer.gossips(&self.profile, &mut gossips_builder, &self.nodes)
        }

        gossips_builder.build(self.profile.clone(), &self.nodes)
    }

    pub fn update_node<F>(&mut self, id: Id, update: F) -> Option<PolicyReport>
    where
        F: FnOnce(&mut Node),
    {
        self.nodes.entry(id).and_modify(&mut self.policy, update)
    }
}