plumtree/
node.rs

1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::time::Duration;
4
5use action::{Action, ActionQueue};
6use message::{GossipMessage, GraftMessage, IhaveMessage, Message, ProtocolMessage, PruneMessage};
7use missing::MissingMessages;
8use time::{Clock, NodeTime};
9use System;
10
11/// Options for Plumtree [Node].
12///
13/// [Node]: ./struct.Node.html
14#[derive(Debug, Clone)]
15pub struct NodeOptions {
16    /// Timeout duration of a `IhaveMessage`.
17    ///
18    /// When a node receives a `IhaveMessage`,
19    /// the expiry time of the message is set after `ihave_timeout` duration.
20    ///
21    /// If it expires before the associated `GossipMessage` is received,
22    /// the node will send `GraftMessage` to the sender of the `IhaveMessage`
23    /// for retrieving the payload of the message.
24    ///
25    /// The default value is `Duration::from_millis(500)`.
26    pub ihave_timeout: Duration,
27
28    /// Optimization threshold.
29    ///
30    /// See "3.8. Optimization" of the [paper] for the description of the parameter.
31    ///
32    /// The default value is `2`.
33    ///
34    /// [paper]: http://www.gsd.inesc-id.pt/~ler/reports/srds07.pdf
35    pub optimization_threshold: u16,
36}
37impl Default for NodeOptions {
38    fn default() -> Self {
39        NodeOptions {
40            ihave_timeout: Duration::from_millis(500),
41            optimization_threshold: 2,
42        }
43    }
44}
45
46/// Plumtree node.
47///
48/// # User's responsibility
49///
50/// For running a node correctly, you have to call the following methods appropriately:
51///
52/// - [`poll_action`]
53/// - [`forget_message`]
54/// - [`handle_protocol_message`]
55/// - [`handle_neighbor_up`]
56/// - [`handle_neighbor_down`]
57/// - [`clock_mut`]
58///
59/// For details, refer to the document of each method.
60///
61/// [`poll_action`]: ./struct.Node.html#method.poll_action
62/// [`forget_message`]: ./struct.Node.html#method.forget_message
63/// [`handle_protocol_message`]: ./struct.Node.html#method.handle_protocol_message
64/// [`handle_neighbor_up`]: ./struct.Node.html#method.handle_neighbor_up
65/// [`handle_neighbor_down`]: ./struct.Node.html#method.handle_neighbor_down
66/// [`clock_mut`]: ./struct.Node.html#method.clock_mut
67pub struct Node<T: System> {
68    id: T::NodeId,
69    options: NodeOptions,
70    eager_push_peers: HashSet<T::NodeId>,
71    lazy_push_peers: HashSet<T::NodeId>,
72    messages: HashMap<T::MessageId, T::MessagePayload>,
73    missings: MissingMessages<T>,
74    actions: ActionQueue<T>,
75    clock: Clock,
76}
77impl<T: System> fmt::Debug for Node<T>
78where
79    T::NodeId: fmt::Debug,
80    T::MessageId: fmt::Debug,
81    T::MessagePayload: fmt::Debug,
82{
83    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84        write!(
85            f,
86            "Node {{ id: {:?}, options: {:?}, eager_push_peers: {:?}, lazy_push_peers: {:?}, \
87             messages: {:?}, missings: {:?}, actions: {:?}, clock: {:?} }}",
88            self.id,
89            self.options,
90            self.eager_push_peers,
91            self.lazy_push_peers,
92            self.messages,
93            self.missings,
94            self.actions,
95            self.clock
96        )
97    }
98}
99impl<T: System> Node<T> {
100    /// Makes a new `Node` instance.
101    pub fn new(node_id: T::NodeId) -> Self {
102        Self::with_options(node_id, NodeOptions::default())
103    }
104
105    /// Makes a new `Node` instance with the given options.
106    pub fn with_options(node_id: T::NodeId, options: NodeOptions) -> Self {
107        Node {
108            id: node_id,
109            options,
110            eager_push_peers: HashSet::new(),
111            lazy_push_peers: HashSet::new(),
112            messages: HashMap::new(),
113            missings: MissingMessages::new(),
114            actions: ActionQueue::new(),
115            clock: Clock::new(),
116        }
117    }
118
119    /// Returns the identifier of the node.
120    pub fn id(&self) -> &T::NodeId {
121        &self.id
122    }
123
124    /// Returns the options of the node.
125    pub fn options(&self) -> &NodeOptions {
126        &self.options
127    }
128
129    /// Returns a mutable reference to the options of the node.
130    pub fn options_mut(&mut self) -> &mut NodeOptions {
131        &mut self.options
132    }
133
134    /// Returns the peers with which the node uses eager push gossip for diffusing application messages.
135    pub fn eager_push_peers(&self) -> &HashSet<T::NodeId> {
136        &self.eager_push_peers
137    }
138
139    /// Returns the peers with which the node uses lazy push gossip for diffusing application messages.
140    pub fn lazy_push_peers(&self) -> &HashSet<T::NodeId> {
141        &self.lazy_push_peers
142    }
143
144    /// Broadcasts the given message.
145    pub fn broadcast_message(&mut self, message: Message<T>) {
146        self.actions.deliver(message.clone());
147
148        let gossip = GossipMessage::new(&self.id, message, 0);
149        self.eager_push(&gossip);
150        self.lazy_push(&gossip);
151        self.messages
152            .insert(gossip.message.id, gossip.message.payload);
153    }
154
155    /// Returns a reference to the messages kept by the node.
156    pub fn messages(&self) -> &HashMap<T::MessageId, T::MessagePayload> {
157        &self.messages
158    }
159
160    /// Returns the number of messages waiting to be received.
161    ///
162    /// Roughly speaking, it indicates the approximate number of `IHAVE` messages held by the node.
163    pub fn waiting_messages(&self) -> usize {
164        self.missings.waiting_messages()
165    }
166
167    /// Forgets the specified message.
168    ///
169    /// If the node does not have the target message, this method will return `false`.
170    ///
171    /// For preventing memory shortage, this method needs to be called appropriately.
172    pub fn forget_message(&mut self, message_id: &T::MessageId) -> bool {
173        self.messages.remove(message_id).is_some()
174    }
175
176    /// Polls the next action that the node wants to execute.
177    pub fn poll_action(&mut self) -> Option<Action<T>> {
178        self.handle_expiration();
179        self.actions.pop()
180    }
181
182    /// Handles the given incoming message.
183    ///
184    /// This method will return `false` if the sender of the message is not a neighbor of this node.
185    pub fn handle_protocol_message(&mut self, message: ProtocolMessage<T>) -> bool {
186        if !self.is_known_node(message.sender()) {
187            return false;
188        }
189        match message {
190            ProtocolMessage::Gossip(m) => self.handle_gossip(m),
191            ProtocolMessage::Ihave(m) => self.handle_ihave(m),
192            ProtocolMessage::Graft(m) => self.handle_graft(m),
193            ProtocolMessage::Prune(m) => self.handle_prune(m),
194        }
195        true
196    }
197
198    /// Accepts new neighbor.
199    pub fn handle_neighbor_up(&mut self, neighbor_node_id: &T::NodeId) {
200        if self.is_known_node(neighbor_node_id) || self.id == *neighbor_node_id {
201            return;
202        }
203        for message_id in self.messages.keys() {
204            let ihave = IhaveMessage::new(&self.id, message_id.clone(), 0, false);
205            self.actions.send(neighbor_node_id.clone(), ihave);
206        }
207        self.eager_push_peers.insert(neighbor_node_id.clone());
208    }
209
210    /// Removes downed neighbor.
211    pub fn handle_neighbor_down(&mut self, neighbor_node_id: &T::NodeId) {
212        if !self.is_known_node(neighbor_node_id) {
213            return;
214        }
215        self.eager_push_peers.remove(neighbor_node_id);
216        self.lazy_push_peers.remove(neighbor_node_id);
217
218        if self.eager_push_peers.is_empty() {
219            while let Some(ihave) = self.missings.pop_expired(&Clock::max()) {
220                if self.send_graft(ihave) {
221                    break;
222                }
223            }
224        }
225    }
226
227    /// Returns a reference to the clock of the node.
228    pub fn clock(&self) -> &Clock {
229        &self.clock
230    }
231
232    /// Returns a mutable reference to the clock of the node.
233    ///
234    /// Note that for handling `IHAVE` messages correctly,
235    /// you have to proceed the time of the node by calling [`Clock::tick`] method.
236    ///
237    /// [`Clock::tick`]: ./time/struct.Clock.html#method.tick
238    pub fn clock_mut(&mut self) -> &mut Clock {
239        &mut self.clock
240    }
241
242    /// Returns the nearest time when the timeout of a `IHAVE` message expires.
243    ///
244    /// If the node has no `IHAVE` messages to be handled, this method will return `None`.
245    pub fn next_expiry_time(&self) -> Option<NodeTime> {
246        self.missings.next_expiry_time()
247    }
248
249    fn handle_expiration(&mut self) {
250        while let Some(ihave) = self.missings.pop_expired(&self.clock) {
251            self.send_graft(ihave);
252        }
253    }
254
255    fn send_graft(&mut self, ihave: IhaveMessage<T>) -> bool {
256        if !self.is_known_node(&ihave.sender) {
257            // The node has been removed from neighbors
258            false
259        } else {
260            self.eager_push_peers.insert(ihave.sender.clone());
261            self.lazy_push_peers.remove(&ihave.sender);
262            self.actions.send(
263                ihave.sender,
264                GraftMessage::new(&self.id, Some(ihave.message_id), ihave.round),
265            );
266            true
267        }
268    }
269
270    #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
271    fn handle_gossip(&mut self, gossip: GossipMessage<T>) {
272        if self.messages.contains_key(&gossip.message.id) {
273            self.eager_push_peers.remove(&gossip.sender);
274            self.lazy_push_peers.insert(gossip.sender.clone());
275            self.actions
276                .send(gossip.sender, PruneMessage::new(&self.id));
277        } else {
278            self.actions.deliver(gossip.message.clone());
279
280            self.eager_push(&gossip);
281            self.lazy_push(&gossip);
282            self.eager_push_peers.insert(gossip.sender.clone());
283            self.lazy_push_peers.remove(&gossip.sender);
284
285            self.optimize(&gossip);
286            self.missings.remove(&gossip.message.id);
287            self.messages
288                .insert(gossip.message.id, gossip.message.payload);
289        }
290    }
291
292    fn handle_ihave(&mut self, mut ihave: IhaveMessage<T>) {
293        if self.messages.contains_key(&ihave.message_id) {
294            return;
295        }
296        if self.eager_push_peers.is_empty() {
297            ihave.realtime = true;
298        }
299        self.missings
300            .push(ihave, &self.clock, self.options.ihave_timeout);
301    }
302
303    fn handle_graft(&mut self, mut graft: GraftMessage<T>) {
304        self.eager_push_peers.insert(graft.sender.clone());
305        self.lazy_push_peers.remove(&graft.sender);
306        if let Some(message_id) = graft.message_id.take() {
307            if let Some(payload) = self.messages.get(&message_id).cloned() {
308                let gossip =
309                    GossipMessage::new(&self.id, Message::new(message_id, payload), graft.round);
310                self.actions.send(graft.sender, gossip);
311            }
312        }
313    }
314
315    fn handle_prune(&mut self, prune: PruneMessage<T>) {
316        self.eager_push_peers.remove(&prune.sender);
317        self.lazy_push_peers.insert(prune.sender);
318    }
319
320    fn eager_push(&mut self, gossip: &GossipMessage<T>) {
321        let round = gossip.round.saturating_add(1);
322        for peer in self.eager_push_peers
323            .iter()
324            .filter(|n| **n != gossip.sender)
325        {
326            let forward = GossipMessage::new(&self.id, gossip.message.clone(), round);
327            self.actions.send(peer.clone(), forward);
328        }
329    }
330
331    fn lazy_push(&mut self, gossip: &GossipMessage<T>) {
332        let round = gossip.round.saturating_add(1);
333        let ihave = IhaveMessage::new(&self.id, gossip.message.id.clone(), round, true);
334        for peer in self.lazy_push_peers.iter().filter(|n| **n != gossip.sender) {
335            self.actions.send(peer.clone(), ihave.clone());
336        }
337    }
338
339    fn optimize(&mut self, gossip: &GossipMessage<T>) {
340        if let Some((ihave_round, ihave_owner)) = self.missings.get_ihave(&gossip.message.id) {
341            let optimize =
342                gossip.round.checked_sub(ihave_round) >= Some(self.options.optimization_threshold);
343            if optimize {
344                let graft = GraftMessage::new(&self.id, None, ihave_round);
345                let prune = PruneMessage::new(&self.id);
346                self.actions.send(ihave_owner.clone(), graft);
347                self.actions.send(gossip.sender.clone(), prune);
348            }
349        }
350    }
351
352    fn is_known_node(&self, node_id: &T::NodeId) -> bool {
353        self.eager_push_peers.contains(node_id) || self.lazy_push_peers.contains(node_id)
354    }
355}