sc_peerset/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be
20//! connected to.
21//!
22//! The PSM handles *sets* of nodes. A set of nodes is defined as the nodes that are believed to
23//! support a certain capability, such as handling blocks and transactions of a specific chain,
24//! or collating a certain parachain.
25//!
26//! For each node in each set, the peerset holds a flag specifying whether the node is
27//! connected to us or not.
28//!
29//! This connected/disconnected status is specific to the node and set combination, and it is for
30//! example possible for a node to be connected through a specific set but not another.
31//!
32//! In addition, for each, set, the peerset also holds a list of reserved nodes towards which it
33//! will at all time try to maintain a connection with.
34
35mod peersstate;
36
37use futures::{channel::oneshot, prelude::*};
38use log::{debug, error, trace};
39use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
40use serde_json::json;
41use std::{
42	collections::{HashMap, HashSet, VecDeque},
43	pin::Pin,
44	task::{Context, Poll},
45	time::{Duration, Instant},
46};
47use wasm_timer::Delay;
48
49pub use libp2p::PeerId;
50
51/// We don't accept nodes whose reputation is under this value.
52pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100);
53/// Reputation change for a node when we get disconnected from it.
54const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
55/// Amount of time between the moment we disconnect from a node and the moment we remove it from
56/// the list.
57const FORGET_AFTER: Duration = Duration::from_secs(3600);
58
59#[derive(Debug)]
60enum Action {
61	AddReservedPeer(SetId, PeerId),
62	RemoveReservedPeer(SetId, PeerId),
63	SetReservedPeers(SetId, HashSet<PeerId>),
64	SetReservedOnly(SetId, bool),
65	ReportPeer(PeerId, ReputationChange),
66	AddToPeersSet(SetId, PeerId),
67	RemoveFromPeersSet(SetId, PeerId),
68	PeerReputation(PeerId, oneshot::Sender<i32>),
69}
70
71/// Identifier of a set in the peerset.
72///
73/// Can be constructed using the `From<usize>` trait implementation based on the index of the set
74/// within [`PeersetConfig::sets`]. For example, the first element of [`PeersetConfig::sets`] is
75/// later referred to with `SetId::from(0)`. It is intended that the code responsible for building
76/// the [`PeersetConfig`] is also responsible for constructing the [`SetId`]s.
77#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
78pub struct SetId(usize);
79
80impl SetId {
81	pub const fn from(id: usize) -> Self {
82		Self(id)
83	}
84}
85
86impl From<usize> for SetId {
87	fn from(id: usize) -> Self {
88		Self(id)
89	}
90}
91
92impl From<SetId> for usize {
93	fn from(id: SetId) -> Self {
94		id.0
95	}
96}
97
98/// Description of a reputation adjustment for a node.
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub struct ReputationChange {
101	/// Reputation delta.
102	pub value: i32,
103	/// Reason for reputation change.
104	pub reason: &'static str,
105}
106
107impl ReputationChange {
108	/// New reputation change with given delta and reason.
109	pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
110		Self { value, reason }
111	}
112
113	/// New reputation change that forces minimum possible reputation.
114	pub const fn new_fatal(reason: &'static str) -> ReputationChange {
115		Self { value: i32::MIN, reason }
116	}
117}
118
119/// Shared handle to the peer set manager (PSM). Distributed around the code.
120#[derive(Debug, Clone)]
121pub struct PeersetHandle {
122	tx: TracingUnboundedSender<Action>,
123}
124
125impl PeersetHandle {
126	/// Adds a new reserved peer. The peerset will make an effort to always remain connected to
127	/// this peer.
128	///
129	/// Has no effect if the node was already a reserved peer.
130	///
131	/// > **Note**: Keep in mind that the networking has to know an address for this node,
132	/// > otherwise it will not be able to connect to it.
133	pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
134		let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id));
135	}
136
137	/// Remove a previously-added reserved peer.
138	///
139	/// Has no effect if the node was not a reserved peer.
140	pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
141		let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id));
142	}
143
144	/// Sets whether or not the peerset only has connections with nodes marked as reserved for
145	/// the given set.
146	pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) {
147		let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved));
148	}
149
150	/// Set reserved peers to the new set.
151	pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet<PeerId>) {
152		let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids));
153	}
154
155	/// Reports an adjustment to the reputation of the given peer.
156	pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) {
157		let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
158	}
159
160	/// Add a peer to a set.
161	pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) {
162		let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id));
163	}
164
165	/// Remove a peer from a set.
166	pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) {
167		let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id));
168	}
169
170	/// Returns the reputation value of the peer.
171	pub async fn peer_reputation(self, peer_id: PeerId) -> Result<i32, ()> {
172		let (tx, rx) = oneshot::channel();
173
174		let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx));
175
176		// The channel can only be closed if the peerset no longer exists.
177		rx.await.map_err(|_| ())
178	}
179}
180
181/// Message that can be sent by the peer set manager (PSM).
182#[derive(Debug, PartialEq)]
183pub enum Message {
184	/// Request to open a connection to the given peer. From the point of view of the PSM, we are
185	/// immediately connected.
186	Connect {
187		set_id: SetId,
188		/// Peer to connect to.
189		peer_id: PeerId,
190	},
191
192	/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
193	Drop {
194		set_id: SetId,
195		/// Peer to disconnect from.
196		peer_id: PeerId,
197	},
198
199	/// Equivalent to `Connect` for the peer corresponding to this incoming index.
200	Accept(IncomingIndex),
201
202	/// Equivalent to `Drop` for the peer corresponding to this incoming index.
203	Reject(IncomingIndex),
204}
205
206/// Opaque identifier for an incoming connection. Allocated by the network.
207#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
208pub struct IncomingIndex(pub u64);
209
210impl From<u64> for IncomingIndex {
211	fn from(val: u64) -> Self {
212		Self(val)
213	}
214}
215
216/// Configuration to pass when creating the peer set manager.
217#[derive(Debug)]
218pub struct PeersetConfig {
219	/// List of sets of nodes the peerset manages.
220	pub sets: Vec<SetConfig>,
221}
222
223/// Configuration for a single set of nodes.
224#[derive(Debug)]
225pub struct SetConfig {
226	/// Maximum number of ingoing links to peers.
227	pub in_peers: u32,
228
229	/// Maximum number of outgoing links to peers.
230	pub out_peers: u32,
231
232	/// List of bootstrap nodes to initialize the set with.
233	///
234	/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
235	/// > otherwise it will not be able to connect to them.
236	pub bootnodes: Vec<PeerId>,
237
238	/// Lists of nodes we should always be connected to.
239	///
240	/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
241	/// >			otherwise it will not be able to connect to them.
242	pub reserved_nodes: HashSet<PeerId>,
243
244	/// If true, we only accept nodes in [`SetConfig::reserved_nodes`].
245	pub reserved_only: bool,
246}
247
248/// Side of the peer set manager owned by the network. In other words, the "receiving" side.
249///
250/// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never
251/// errors.
252#[derive(Debug)]
253pub struct Peerset {
254	/// Underlying data structure for the nodes's states.
255	data: peersstate::PeersState,
256	/// For each set, lists of nodes that don't occupy slots and that we should try to always be
257	/// connected to, and whether only reserved nodes are accepted. Is kept in sync with the list
258	/// of non-slot-occupying nodes in [`Peerset::data`].
259	reserved_nodes: Vec<(HashSet<PeerId>, bool)>,
260	/// Receiver for messages from the `PeersetHandle` and from `tx`.
261	rx: TracingUnboundedReceiver<Action>,
262	/// Sending side of `rx`.
263	tx: TracingUnboundedSender<Action>,
264	/// Queue of messages to be emitted when the `Peerset` is polled.
265	message_queue: VecDeque<Message>,
266	/// When the `Peerset` was created.
267	created: Instant,
268	/// Last time when we updated the reputations of connected nodes.
269	latest_time_update: Instant,
270	/// Next time to do a periodic call to `alloc_slots` with all sets. This is done once per
271	/// second, to match the period of the reputation updates.
272	next_periodic_alloc_slots: Delay,
273}
274
275impl Peerset {
276	/// Builds a new peerset from the given configuration.
277	pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) {
278		let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000);
279
280		let handle = PeersetHandle { tx: tx.clone() };
281
282		let mut peerset = {
283			let now = Instant::now();
284
285			Self {
286				data: peersstate::PeersState::new(config.sets.iter().map(|set| {
287					peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers }
288				})),
289				tx,
290				rx,
291				reserved_nodes: config
292					.sets
293					.iter()
294					.map(|set| (set.reserved_nodes.clone(), set.reserved_only))
295					.collect(),
296				message_queue: VecDeque::new(),
297				created: now,
298				latest_time_update: now,
299				next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)),
300			}
301		};
302
303		for (set, set_config) in config.sets.into_iter().enumerate() {
304			for node in set_config.reserved_nodes {
305				peerset.data.add_no_slot_node(set, node);
306			}
307
308			for peer_id in set_config.bootnodes {
309				if let peersstate::Peer::Unknown(entry) = peerset.data.peer(set, &peer_id) {
310					entry.discover();
311				} else {
312					debug!(target: "peerset", "Duplicate bootnode in config: {:?}", peer_id);
313				}
314			}
315		}
316
317		for set_index in 0..peerset.data.num_sets() {
318			peerset.alloc_slots(SetId(set_index));
319		}
320
321		(peerset, handle)
322	}
323
324	fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
325		let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id);
326		if !newly_inserted {
327			return
328		}
329
330		self.data.add_no_slot_node(set_id.0, peer_id);
331		self.alloc_slots(set_id);
332	}
333
334	fn on_remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
335		if !self.reserved_nodes[set_id.0].0.remove(&peer_id) {
336			return
337		}
338
339		self.data.remove_no_slot_node(set_id.0, &peer_id);
340
341		// Nothing more to do if not in reserved-only mode.
342		if !self.reserved_nodes[set_id.0].1 {
343			return
344		}
345
346		// If, however, the peerset is in reserved-only mode, then the removed node needs to be
347		// disconnected.
348		if let peersstate::Peer::Connected(peer) = self.data.peer(set_id.0, &peer_id) {
349			peer.disconnect();
350			self.message_queue.push_back(Message::Drop { set_id, peer_id });
351		}
352	}
353
354	fn on_set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet<PeerId>) {
355		// Determine the difference between the current group and the new list.
356		let (to_insert, to_remove) = {
357			let to_insert = peer_ids
358				.difference(&self.reserved_nodes[set_id.0].0)
359				.cloned()
360				.collect::<Vec<_>>();
361			let to_remove = self.reserved_nodes[set_id.0]
362				.0
363				.difference(&peer_ids)
364				.cloned()
365				.collect::<Vec<_>>();
366			(to_insert, to_remove)
367		};
368
369		for node in to_insert {
370			self.on_add_reserved_peer(set_id, node);
371		}
372
373		for node in to_remove {
374			self.on_remove_reserved_peer(set_id, node);
375		}
376	}
377
378	fn on_set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) {
379		self.reserved_nodes[set_id.0].1 = reserved_only;
380
381		if reserved_only {
382			// Disconnect all the nodes that aren't reserved.
383			for peer_id in
384				self.data.connected_peers(set_id.0).cloned().collect::<Vec<_>>().into_iter()
385			{
386				if self.reserved_nodes[set_id.0].0.contains(&peer_id) {
387					continue
388				}
389
390				let peer = self.data.peer(set_id.0, &peer_id).into_connected().expect(
391					"We are enumerating connected peers, therefore the peer is connected; qed",
392				);
393				peer.disconnect();
394				self.message_queue.push_back(Message::Drop { set_id, peer_id });
395			}
396		} else {
397			self.alloc_slots(set_id);
398		}
399	}
400
401	/// Returns the list of reserved peers.
402	pub fn reserved_peers(&self, set_id: SetId) -> impl Iterator<Item = &PeerId> {
403		self.reserved_nodes[set_id.0].0.iter()
404	}
405
406	/// Adds a node to the given set. The peerset will, if possible and not already the case,
407	/// try to connect to it.
408	///
409	/// > **Note**: This has the same effect as [`PeersetHandle::add_to_peers_set`].
410	pub fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) {
411		if let peersstate::Peer::Unknown(entry) = self.data.peer(set_id.0, &peer_id) {
412			entry.discover();
413			self.alloc_slots(set_id);
414		}
415	}
416
417	fn on_remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) {
418		// Don't do anything if node is reserved.
419		if self.reserved_nodes[set_id.0].0.contains(&peer_id) {
420			return
421		}
422
423		match self.data.peer(set_id.0, &peer_id) {
424			peersstate::Peer::Connected(peer) => {
425				self.message_queue.push_back(Message::Drop { set_id, peer_id: *peer.peer_id() });
426				peer.disconnect().forget_peer();
427			},
428			peersstate::Peer::NotConnected(peer) => {
429				peer.forget_peer();
430			},
431			peersstate::Peer::Unknown(_) => {},
432		}
433	}
434
435	fn on_report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
436		// We want reputations to be up-to-date before adjusting them.
437		self.update_time();
438
439		let mut reputation = self.data.peer_reputation(peer_id);
440		reputation.add_reputation(change.value);
441		if reputation.reputation() >= BANNED_THRESHOLD {
442			trace!(target: "peerset", "Report {}: {:+} to {}. Reason: {}",
443				peer_id, change.value, reputation.reputation(), change.reason
444			);
445			return
446		}
447
448		debug!(target: "peerset", "Report {}: {:+} to {}. Reason: {}, Disconnecting",
449			peer_id, change.value, reputation.reputation(), change.reason
450		);
451
452		drop(reputation);
453
454		for set_index in 0..self.data.num_sets() {
455			if let peersstate::Peer::Connected(peer) = self.data.peer(set_index, &peer_id) {
456				let peer = peer.disconnect();
457				self.message_queue.push_back(Message::Drop {
458					set_id: SetId(set_index),
459					peer_id: peer.into_peer_id(),
460				});
461
462				self.alloc_slots(SetId(set_index));
463			}
464		}
465	}
466
467	fn on_peer_reputation(&mut self, peer_id: PeerId, pending_response: oneshot::Sender<i32>) {
468		let reputation = self.data.peer_reputation(peer_id);
469		let _ = pending_response.send(reputation.reputation());
470	}
471
472	/// Updates the value of `self.latest_time_update` and performs all the updates that happen
473	/// over time, such as reputation increases for staying connected.
474	fn update_time(&mut self) {
475		let now = Instant::now();
476
477		// We basically do `(now - self.latest_update).as_secs()`, except that by the way we do it
478		// we know that we're not going to miss seconds because of rounding to integers.
479		let secs_diff = {
480			let elapsed_latest = self.latest_time_update - self.created;
481			let elapsed_now = now - self.created;
482			self.latest_time_update = now;
483			elapsed_now.as_secs() - elapsed_latest.as_secs()
484		};
485
486		// For each elapsed second, move the node reputation towards zero.
487		// If we multiply each second the reputation by `k` (where `k` is between 0 and 1), it
488		// takes `ln(0.5) / ln(k)` seconds to reduce the reputation by half. Use this formula to
489		// empirically determine a value of `k` that looks correct.
490		for _ in 0..secs_diff {
491			for peer_id in self.data.peers().cloned().collect::<Vec<_>>() {
492				// We use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds
493				// to reduce the reputation by half.
494				fn reput_tick(reput: i32) -> i32 {
495					let mut diff = reput / 50;
496					if diff == 0 && reput < 0 {
497						diff = -1;
498					} else if diff == 0 && reput > 0 {
499						diff = 1;
500					}
501					reput.saturating_sub(diff)
502				}
503
504				let mut peer_reputation = self.data.peer_reputation(peer_id);
505
506				let before = peer_reputation.reputation();
507				let after = reput_tick(before);
508				trace!(target: "peerset", "Fleeting {}: {} -> {}", peer_id, before, after);
509				peer_reputation.set_reputation(after);
510
511				if after != 0 {
512					continue
513				}
514
515				drop(peer_reputation);
516
517				// If the peer reaches a reputation of 0, and there is no connection to it,
518				// forget it.
519				for set_index in 0..self.data.num_sets() {
520					match self.data.peer(set_index, &peer_id) {
521						peersstate::Peer::Connected(_) => {},
522						peersstate::Peer::NotConnected(peer) => {
523							if peer.last_connected_or_discovered() + FORGET_AFTER < now {
524								peer.forget_peer();
525							}
526						},
527						peersstate::Peer::Unknown(_) => {
528							// Happens if this peer does not belong to this set.
529						},
530					}
531				}
532			}
533		}
534	}
535
536	/// Try to fill available out slots with nodes for the given set.
537	fn alloc_slots(&mut self, set_id: SetId) {
538		self.update_time();
539
540		// Try to connect to all the reserved nodes that we are not connected to.
541		for reserved_node in &self.reserved_nodes[set_id.0].0 {
542			let entry = match self.data.peer(set_id.0, reserved_node) {
543				peersstate::Peer::Unknown(n) => n.discover(),
544				peersstate::Peer::NotConnected(n) => n,
545				peersstate::Peer::Connected(_) => continue,
546			};
547
548			// Don't connect to nodes with an abysmal reputation, even if they're reserved.
549			// This is a rather opinionated behaviour, and it wouldn't be fundamentally wrong to
550			// remove that check. If necessary, the peerset should be refactored to give more
551			// control over what happens in that situation.
552			if entry.reputation() < BANNED_THRESHOLD {
553				break
554			}
555
556			match entry.try_outgoing() {
557				Ok(conn) => self
558					.message_queue
559					.push_back(Message::Connect { set_id, peer_id: conn.into_peer_id() }),
560				Err(_) => {
561					// An error is returned only if no slot is available. Reserved nodes are
562					// marked in the state machine with a flag saying "doesn't occupy a slot",
563					// and as such this should never happen.
564					debug_assert!(false);
565					log::error!(
566						target: "peerset",
567						"Not enough slots to connect to reserved node"
568					);
569				},
570			}
571		}
572
573		// Now, we try to connect to other nodes.
574
575		// Nothing more to do if we're in reserved mode.
576		if self.reserved_nodes[set_id.0].1 {
577			return
578		}
579
580		// Try to grab the next node to attempt to connect to.
581		// Since `highest_not_connected_peer` is rather expensive to call, check beforehand
582		// whether we have an available slot.
583		while self.data.has_free_outgoing_slot(set_id.0) {
584			let next = match self.data.highest_not_connected_peer(set_id.0) {
585				Some(n) => n,
586				None => break,
587			};
588
589			// Don't connect to nodes with an abysmal reputation.
590			if next.reputation() < BANNED_THRESHOLD {
591				break
592			}
593
594			match next.try_outgoing() {
595				Ok(conn) => self
596					.message_queue
597					.push_back(Message::Connect { set_id, peer_id: conn.into_peer_id() }),
598				Err(_) => {
599					// This branch can only be entered if there is no free slot, which is
600					// checked above.
601					debug_assert!(false);
602					break
603				},
604			}
605		}
606	}
607
608	/// Indicate that we received an incoming connection. Must be answered either with
609	/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
610	///
611	/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
612	/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
613	/// `dropped`.
614	// Implementation note: because of concurrency issues, it is possible that we push a `Connect`
615	// message to the output channel with a `PeerId`, and that `incoming` gets called with the same
616	// `PeerId` before that message has been read by the user. In this situation we must not answer.
617	pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) {
618		trace!(target: "peerset", "Incoming {:?}", peer_id);
619
620		self.update_time();
621
622		if self.reserved_nodes[set_id.0].1 && !self.reserved_nodes[set_id.0].0.contains(&peer_id) {
623			self.message_queue.push_back(Message::Reject(index));
624			return
625		}
626
627		let not_connected = match self.data.peer(set_id.0, &peer_id) {
628			// If we're already connected, don't answer, as the docs mention.
629			peersstate::Peer::Connected(_) => return,
630			peersstate::Peer::NotConnected(mut entry) => {
631				entry.bump_last_connected_or_discovered();
632				entry
633			},
634			peersstate::Peer::Unknown(entry) => entry.discover(),
635		};
636
637		if not_connected.reputation() < BANNED_THRESHOLD {
638			self.message_queue.push_back(Message::Reject(index));
639			return
640		}
641
642		match not_connected.try_accept_incoming() {
643			Ok(_) => self.message_queue.push_back(Message::Accept(index)),
644			Err(_) => self.message_queue.push_back(Message::Reject(index)),
645		}
646	}
647
648	/// Indicate that we dropped an active connection with a peer, or that we failed to connect.
649	///
650	/// Must only be called after the PSM has either generated a `Connect` message with this
651	/// `PeerId`, or accepted an incoming connection with this `PeerId`.
652	pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) {
653		// We want reputations to be up-to-date before adjusting them.
654		self.update_time();
655
656		match self.data.peer(set_id.0, &peer_id) {
657			peersstate::Peer::Connected(mut entry) => {
658				// Decrease the node's reputation so that we don't try it again and again and again.
659				entry.add_reputation(DISCONNECT_REPUTATION_CHANGE);
660				trace!(target: "peerset", "Dropping {}: {:+} to {}",
661					peer_id, DISCONNECT_REPUTATION_CHANGE, entry.reputation());
662				entry.disconnect();
663			},
664			peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) => {
665				error!(target: "peerset", "Received dropped() for non-connected node")
666			},
667		}
668
669		if let DropReason::Refused = reason {
670			self.on_remove_from_peers_set(set_id, peer_id);
671		}
672
673		self.alloc_slots(set_id);
674	}
675
676	/// Reports an adjustment to the reputation of the given peer.
677	pub fn report_peer(&mut self, peer_id: PeerId, score_diff: ReputationChange) {
678		// We don't immediately perform the adjustments in order to have state consistency. We
679		// don't want the reporting here to take priority over messages sent using the
680		// `PeersetHandle`.
681		let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
682	}
683
684	/// Produces a JSON object containing the state of the peerset manager, for debugging purposes.
685	pub fn debug_info(&mut self) -> serde_json::Value {
686		self.update_time();
687
688		json!({
689			"sets": (0..self.data.num_sets()).map(|set_index| {
690				json!({
691					"nodes": self.data.peers().cloned().collect::<Vec<_>>().into_iter().filter_map(|peer_id| {
692						let state = match self.data.peer(set_index, &peer_id) {
693							peersstate::Peer::Connected(entry) => json!({
694								"connected": true,
695								"reputation": entry.reputation()
696							}),
697							peersstate::Peer::NotConnected(entry) => json!({
698								"connected": false,
699								"reputation": entry.reputation()
700							}),
701							peersstate::Peer::Unknown(_) => return None,
702						};
703
704						Some((peer_id.to_base58(), state))
705					}).collect::<HashMap<_, _>>(),
706					"reserved_nodes": self.reserved_nodes[set_index].0.iter().map(|peer_id| {
707						peer_id.to_base58()
708					}).collect::<HashSet<_>>(),
709					"reserved_only": self.reserved_nodes[set_index].1,
710				})
711			}).collect::<Vec<_>>(),
712			"message_queue": self.message_queue.len(),
713		})
714	}
715
716	/// Returns the number of peers that we have discovered.
717	pub fn num_discovered_peers(&self) -> usize {
718		self.data.peers().len()
719	}
720}
721
722impl Stream for Peerset {
723	type Item = Message;
724
725	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
726		loop {
727			if let Some(message) = self.message_queue.pop_front() {
728				return Poll::Ready(Some(message))
729			}
730
731			if Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx).is_ready() {
732				self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0));
733
734				for set_index in 0..self.data.num_sets() {
735					self.alloc_slots(SetId(set_index));
736				}
737			}
738
739			let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
740				Poll::Pending => return Poll::Pending,
741				Poll::Ready(Some(event)) => event,
742				Poll::Ready(None) => return Poll::Pending,
743			};
744
745			match action {
746				Action::AddReservedPeer(set_id, peer_id) =>
747					self.on_add_reserved_peer(set_id, peer_id),
748				Action::RemoveReservedPeer(set_id, peer_id) =>
749					self.on_remove_reserved_peer(set_id, peer_id),
750				Action::SetReservedPeers(set_id, peer_ids) =>
751					self.on_set_reserved_peers(set_id, peer_ids),
752				Action::SetReservedOnly(set_id, reserved) =>
753					self.on_set_reserved_only(set_id, reserved),
754				Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff),
755				Action::AddToPeersSet(sets_name, peer_id) =>
756					self.add_to_peers_set(sets_name, peer_id),
757				Action::RemoveFromPeersSet(sets_name, peer_id) =>
758					self.on_remove_from_peers_set(sets_name, peer_id),
759				Action::PeerReputation(peer_id, pending_response) =>
760					self.on_peer_reputation(peer_id, pending_response),
761			}
762		}
763	}
764}
765
766/// Reason for calling [`Peerset::dropped`].
767pub enum DropReason {
768	/// Substream or connection has been closed for an unknown reason.
769	Unknown,
770	/// Substream or connection has been explicitly refused by the target. In other words, the
771	/// peer doesn't actually belong to this set.
772	///
773	/// This has the side effect of calling [`PeersetHandle::remove_from_peers_set`].
774	Refused,
775}
776
777#[cfg(test)]
778mod tests {
779	use super::{
780		IncomingIndex, Message, Peerset, PeersetConfig, ReputationChange, SetConfig, SetId,
781		BANNED_THRESHOLD,
782	};
783	use futures::prelude::*;
784	use libp2p::PeerId;
785	use std::{pin::Pin, task::Poll, thread, time::Duration};
786
787	fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
788		for expected_message in messages {
789			let (message, p) = next_message(peerset).expect("expected message");
790			assert_eq!(message, expected_message);
791			peerset = p;
792		}
793		peerset
794	}
795
796	fn next_message(mut peerset: Peerset) -> Result<(Message, Peerset), ()> {
797		let next = futures::executor::block_on_stream(&mut peerset).next();
798		let message = next.ok_or(())?;
799		Ok((message, peerset))
800	}
801
802	#[test]
803	fn test_peerset_add_reserved_peer() {
804		let bootnode = PeerId::random();
805		let reserved_peer = PeerId::random();
806		let reserved_peer2 = PeerId::random();
807		let config = PeersetConfig {
808			sets: vec![SetConfig {
809				in_peers: 0,
810				out_peers: 2,
811				bootnodes: vec![bootnode],
812				reserved_nodes: Default::default(),
813				reserved_only: true,
814			}],
815		};
816
817		let (peerset, handle) = Peerset::from_config(config);
818		handle.add_reserved_peer(SetId::from(0), reserved_peer);
819		handle.add_reserved_peer(SetId::from(0), reserved_peer2);
820
821		assert_messages(
822			peerset,
823			vec![
824				Message::Connect { set_id: SetId::from(0), peer_id: reserved_peer },
825				Message::Connect { set_id: SetId::from(0), peer_id: reserved_peer2 },
826			],
827		);
828	}
829
830	#[test]
831	fn test_peerset_incoming() {
832		let bootnode = PeerId::random();
833		let incoming = PeerId::random();
834		let incoming2 = PeerId::random();
835		let incoming3 = PeerId::random();
836		let ii = IncomingIndex(1);
837		let ii2 = IncomingIndex(2);
838		let ii3 = IncomingIndex(3);
839		let ii4 = IncomingIndex(3);
840		let config = PeersetConfig {
841			sets: vec![SetConfig {
842				in_peers: 2,
843				out_peers: 1,
844				bootnodes: vec![bootnode],
845				reserved_nodes: Default::default(),
846				reserved_only: false,
847			}],
848		};
849
850		let (mut peerset, _handle) = Peerset::from_config(config);
851		peerset.incoming(SetId::from(0), incoming, ii);
852		peerset.incoming(SetId::from(0), incoming, ii4);
853		peerset.incoming(SetId::from(0), incoming2, ii2);
854		peerset.incoming(SetId::from(0), incoming3, ii3);
855
856		assert_messages(
857			peerset,
858			vec![
859				Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
860				Message::Accept(ii),
861				Message::Accept(ii2),
862				Message::Reject(ii3),
863			],
864		);
865	}
866
867	#[test]
868	fn test_peerset_reject_incoming_in_reserved_only() {
869		let incoming = PeerId::random();
870		let ii = IncomingIndex(1);
871		let config = PeersetConfig {
872			sets: vec![SetConfig {
873				in_peers: 50,
874				out_peers: 50,
875				bootnodes: vec![],
876				reserved_nodes: Default::default(),
877				reserved_only: true,
878			}],
879		};
880
881		let (mut peerset, _) = Peerset::from_config(config);
882		peerset.incoming(SetId::from(0), incoming, ii);
883
884		assert_messages(peerset, vec![Message::Reject(ii)]);
885	}
886
887	#[test]
888	fn test_peerset_discovered() {
889		let bootnode = PeerId::random();
890		let discovered = PeerId::random();
891		let discovered2 = PeerId::random();
892		let config = PeersetConfig {
893			sets: vec![SetConfig {
894				in_peers: 0,
895				out_peers: 2,
896				bootnodes: vec![bootnode],
897				reserved_nodes: Default::default(),
898				reserved_only: false,
899			}],
900		};
901
902		let (mut peerset, _handle) = Peerset::from_config(config);
903		peerset.add_to_peers_set(SetId::from(0), discovered);
904		peerset.add_to_peers_set(SetId::from(0), discovered);
905		peerset.add_to_peers_set(SetId::from(0), discovered2);
906
907		assert_messages(
908			peerset,
909			vec![
910				Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
911				Message::Connect { set_id: SetId::from(0), peer_id: discovered },
912			],
913		);
914	}
915
916	#[test]
917	fn test_peerset_banned() {
918		let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
919			sets: vec![SetConfig {
920				in_peers: 25,
921				out_peers: 25,
922				bootnodes: vec![],
923				reserved_nodes: Default::default(),
924				reserved_only: false,
925			}],
926		});
927
928		// We ban a node by setting its reputation under the threshold.
929		let peer_id = PeerId::random();
930		handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
931
932		let fut = futures::future::poll_fn(move |cx| {
933			// We need one polling for the message to be processed.
934			assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
935
936			// Check that an incoming connection from that node gets refused.
937			peerset.incoming(SetId::from(0), peer_id, IncomingIndex(1));
938			if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
939				assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
940			} else {
941				panic!()
942			}
943
944			// Wait a bit for the node's reputation to go above the threshold.
945			thread::sleep(Duration::from_millis(1500));
946
947			// Try again. This time the node should be accepted.
948			peerset.incoming(SetId::from(0), peer_id, IncomingIndex(2));
949			while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
950				assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2)));
951			}
952
953			Poll::Ready(())
954		});
955
956		futures::executor::block_on(fut);
957	}
958
959	#[test]
960	fn test_relloc_after_banned() {
961		let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
962			sets: vec![SetConfig {
963				in_peers: 25,
964				out_peers: 25,
965				bootnodes: vec![],
966				reserved_nodes: Default::default(),
967				reserved_only: false,
968			}],
969		});
970
971		// We ban a node by setting its reputation under the threshold.
972		let peer_id = PeerId::random();
973		handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
974
975		let fut = futures::future::poll_fn(move |cx| {
976			// We need one polling for the message to be processed.
977			assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
978
979			// Check that an incoming connection from that node gets refused.
980			// This is already tested in other tests, but it is done again here because it doesn't
981			// hurt.
982			peerset.incoming(SetId::from(0), peer_id, IncomingIndex(1));
983			if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
984				assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
985			} else {
986				panic!()
987			}
988
989			// Wait for the peerset to change its mind and actually connect to it.
990			while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
991				assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id });
992			}
993
994			Poll::Ready(())
995		});
996
997		futures::executor::block_on(fut);
998	}
999}