1use std::collections::{HashMap, HashSet};
2use std::fmt;
3use std::time::Duration;
4
5use action::{Action, ActionQueue};
6use message::{GossipMessage, GraftMessage, IhaveMessage, Message, ProtocolMessage, PruneMessage};
7use missing::MissingMessages;
8use time::{Clock, NodeTime};
9use System;
10
11#[derive(Debug, Clone)]
15pub struct NodeOptions {
16 pub ihave_timeout: Duration,
27
28 pub optimization_threshold: u16,
36}
37impl Default for NodeOptions {
38 fn default() -> Self {
39 NodeOptions {
40 ihave_timeout: Duration::from_millis(500),
41 optimization_threshold: 2,
42 }
43 }
44}
45
46pub struct Node<T: System> {
68 id: T::NodeId,
69 options: NodeOptions,
70 eager_push_peers: HashSet<T::NodeId>,
71 lazy_push_peers: HashSet<T::NodeId>,
72 messages: HashMap<T::MessageId, T::MessagePayload>,
73 missings: MissingMessages<T>,
74 actions: ActionQueue<T>,
75 clock: Clock,
76}
77impl<T: System> fmt::Debug for Node<T>
78where
79 T::NodeId: fmt::Debug,
80 T::MessageId: fmt::Debug,
81 T::MessagePayload: fmt::Debug,
82{
83 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84 write!(
85 f,
86 "Node {{ id: {:?}, options: {:?}, eager_push_peers: {:?}, lazy_push_peers: {:?}, \
87 messages: {:?}, missings: {:?}, actions: {:?}, clock: {:?} }}",
88 self.id,
89 self.options,
90 self.eager_push_peers,
91 self.lazy_push_peers,
92 self.messages,
93 self.missings,
94 self.actions,
95 self.clock
96 )
97 }
98}
99impl<T: System> Node<T> {
100 pub fn new(node_id: T::NodeId) -> Self {
102 Self::with_options(node_id, NodeOptions::default())
103 }
104
105 pub fn with_options(node_id: T::NodeId, options: NodeOptions) -> Self {
107 Node {
108 id: node_id,
109 options,
110 eager_push_peers: HashSet::new(),
111 lazy_push_peers: HashSet::new(),
112 messages: HashMap::new(),
113 missings: MissingMessages::new(),
114 actions: ActionQueue::new(),
115 clock: Clock::new(),
116 }
117 }
118
119 pub fn id(&self) -> &T::NodeId {
121 &self.id
122 }
123
124 pub fn options(&self) -> &NodeOptions {
126 &self.options
127 }
128
129 pub fn options_mut(&mut self) -> &mut NodeOptions {
131 &mut self.options
132 }
133
134 pub fn eager_push_peers(&self) -> &HashSet<T::NodeId> {
136 &self.eager_push_peers
137 }
138
139 pub fn lazy_push_peers(&self) -> &HashSet<T::NodeId> {
141 &self.lazy_push_peers
142 }
143
144 pub fn broadcast_message(&mut self, message: Message<T>) {
146 self.actions.deliver(message.clone());
147
148 let gossip = GossipMessage::new(&self.id, message, 0);
149 self.eager_push(&gossip);
150 self.lazy_push(&gossip);
151 self.messages
152 .insert(gossip.message.id, gossip.message.payload);
153 }
154
155 pub fn messages(&self) -> &HashMap<T::MessageId, T::MessagePayload> {
157 &self.messages
158 }
159
160 pub fn waiting_messages(&self) -> usize {
164 self.missings.waiting_messages()
165 }
166
167 pub fn forget_message(&mut self, message_id: &T::MessageId) -> bool {
173 self.messages.remove(message_id).is_some()
174 }
175
176 pub fn poll_action(&mut self) -> Option<Action<T>> {
178 self.handle_expiration();
179 self.actions.pop()
180 }
181
182 pub fn handle_protocol_message(&mut self, message: ProtocolMessage<T>) -> bool {
186 if !self.is_known_node(message.sender()) {
187 return false;
188 }
189 match message {
190 ProtocolMessage::Gossip(m) => self.handle_gossip(m),
191 ProtocolMessage::Ihave(m) => self.handle_ihave(m),
192 ProtocolMessage::Graft(m) => self.handle_graft(m),
193 ProtocolMessage::Prune(m) => self.handle_prune(m),
194 }
195 true
196 }
197
198 pub fn handle_neighbor_up(&mut self, neighbor_node_id: &T::NodeId) {
200 if self.is_known_node(neighbor_node_id) || self.id == *neighbor_node_id {
201 return;
202 }
203 for message_id in self.messages.keys() {
204 let ihave = IhaveMessage::new(&self.id, message_id.clone(), 0, false);
205 self.actions.send(neighbor_node_id.clone(), ihave);
206 }
207 self.eager_push_peers.insert(neighbor_node_id.clone());
208 }
209
210 pub fn handle_neighbor_down(&mut self, neighbor_node_id: &T::NodeId) {
212 if !self.is_known_node(neighbor_node_id) {
213 return;
214 }
215 self.eager_push_peers.remove(neighbor_node_id);
216 self.lazy_push_peers.remove(neighbor_node_id);
217
218 if self.eager_push_peers.is_empty() {
219 while let Some(ihave) = self.missings.pop_expired(&Clock::max()) {
220 if self.send_graft(ihave) {
221 break;
222 }
223 }
224 }
225 }
226
227 pub fn clock(&self) -> &Clock {
229 &self.clock
230 }
231
232 pub fn clock_mut(&mut self) -> &mut Clock {
239 &mut self.clock
240 }
241
242 pub fn next_expiry_time(&self) -> Option<NodeTime> {
246 self.missings.next_expiry_time()
247 }
248
249 fn handle_expiration(&mut self) {
250 while let Some(ihave) = self.missings.pop_expired(&self.clock) {
251 self.send_graft(ihave);
252 }
253 }
254
255 fn send_graft(&mut self, ihave: IhaveMessage<T>) -> bool {
256 if !self.is_known_node(&ihave.sender) {
257 false
259 } else {
260 self.eager_push_peers.insert(ihave.sender.clone());
261 self.lazy_push_peers.remove(&ihave.sender);
262 self.actions.send(
263 ihave.sender,
264 GraftMessage::new(&self.id, Some(ihave.message_id), ihave.round),
265 );
266 true
267 }
268 }
269
270 #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
271 fn handle_gossip(&mut self, gossip: GossipMessage<T>) {
272 if self.messages.contains_key(&gossip.message.id) {
273 self.eager_push_peers.remove(&gossip.sender);
274 self.lazy_push_peers.insert(gossip.sender.clone());
275 self.actions
276 .send(gossip.sender, PruneMessage::new(&self.id));
277 } else {
278 self.actions.deliver(gossip.message.clone());
279
280 self.eager_push(&gossip);
281 self.lazy_push(&gossip);
282 self.eager_push_peers.insert(gossip.sender.clone());
283 self.lazy_push_peers.remove(&gossip.sender);
284
285 self.optimize(&gossip);
286 self.missings.remove(&gossip.message.id);
287 self.messages
288 .insert(gossip.message.id, gossip.message.payload);
289 }
290 }
291
292 fn handle_ihave(&mut self, mut ihave: IhaveMessage<T>) {
293 if self.messages.contains_key(&ihave.message_id) {
294 return;
295 }
296 if self.eager_push_peers.is_empty() {
297 ihave.realtime = true;
298 }
299 self.missings
300 .push(ihave, &self.clock, self.options.ihave_timeout);
301 }
302
303 fn handle_graft(&mut self, mut graft: GraftMessage<T>) {
304 self.eager_push_peers.insert(graft.sender.clone());
305 self.lazy_push_peers.remove(&graft.sender);
306 if let Some(message_id) = graft.message_id.take() {
307 if let Some(payload) = self.messages.get(&message_id).cloned() {
308 let gossip =
309 GossipMessage::new(&self.id, Message::new(message_id, payload), graft.round);
310 self.actions.send(graft.sender, gossip);
311 }
312 }
313 }
314
315 fn handle_prune(&mut self, prune: PruneMessage<T>) {
316 self.eager_push_peers.remove(&prune.sender);
317 self.lazy_push_peers.insert(prune.sender);
318 }
319
320 fn eager_push(&mut self, gossip: &GossipMessage<T>) {
321 let round = gossip.round.saturating_add(1);
322 for peer in self.eager_push_peers
323 .iter()
324 .filter(|n| **n != gossip.sender)
325 {
326 let forward = GossipMessage::new(&self.id, gossip.message.clone(), round);
327 self.actions.send(peer.clone(), forward);
328 }
329 }
330
331 fn lazy_push(&mut self, gossip: &GossipMessage<T>) {
332 let round = gossip.round.saturating_add(1);
333 let ihave = IhaveMessage::new(&self.id, gossip.message.id.clone(), round, true);
334 for peer in self.lazy_push_peers.iter().filter(|n| **n != gossip.sender) {
335 self.actions.send(peer.clone(), ihave.clone());
336 }
337 }
338
339 fn optimize(&mut self, gossip: &GossipMessage<T>) {
340 if let Some((ihave_round, ihave_owner)) = self.missings.get_ihave(&gossip.message.id) {
341 let optimize =
342 gossip.round.checked_sub(ihave_round) >= Some(self.options.optimization_threshold);
343 if optimize {
344 let graft = GraftMessage::new(&self.id, None, ihave_round);
345 let prune = PruneMessage::new(&self.id);
346 self.actions.send(ihave_owner.clone(), graft);
347 self.actions.send(gossip.sender.clone(), prune);
348 }
349 }
350 }
351
352 fn is_known_node(&self, node_id: &T::NodeId) -> bool {
353 self.eager_push_peers.contains(node_id) || self.lazy_push_peers.contains(node_id)
354 }
355}