Skip to main content

soil_network/
peer_store.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! [`PeerStore`] manages peer reputations and provides connection candidates to
8//! [`crate::protocol_controller::ProtocolController`].
9
10use crate::service::{metrics::PeerStoreMetrics, traits::PeerStore as PeerStoreT};
11
12use crate::common::{role::ObservedRole, types::ReputationChange};
13use libp2p::PeerId;
14use log::trace;
15use parking_lot::Mutex;
16use partial_sort::PartialSort;
17use soil_prometheus::Registry;
18use std::{
19	cmp::{Ord, Ordering, PartialOrd},
20	collections::{hash_map::Entry, HashMap, HashSet},
21	fmt::Debug,
22	sync::Arc,
23	time::{Duration, Instant},
24};
25use wasm_timer::Delay;
26
27/// Log target for this file.
28pub const LOG_TARGET: &str = "peerset";
29
30/// We don't accept nodes whose reputation is under this value.
31pub const BANNED_THRESHOLD: i32 = 71 * (i32::MIN / 100);
32/// Reputation change for a node when we get disconnected from it.
33const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
34/// Relative decrement of a reputation value that is applied every second. I.e., for inverse
35/// decrement of 200 we decrease absolute value of the reputation by 1/200.
36///
37/// This corresponds to a factor of `k = 0.955`, where k = 1 - 1 / INVERSE_DECREMENT.
38///
39/// It takes ~ `ln(0.5) / ln(k)` seconds to reduce the reputation by half, or 138.63 seconds for the
40/// values above.
41///
42/// In this setup:
43/// - `i32::MAX` becomes 0 in exactly 3544 seconds, or approximately 59 minutes
44/// - `i32::MIN` becomes 0 in exactly 3544 seconds, or approximately 59 minutes
45/// - `i32::MIN` escapes the banned threshold in 69 seconds
46const INVERSE_DECREMENT: i32 = 200;
47/// Amount of time between the moment we last updated the [`PeerStore`] entry and the moment we
48/// remove it, once the reputation value reaches 0.
49const FORGET_AFTER: Duration = Duration::from_secs(3600);
50
51/// Trait describing the required functionality from a `Peerset` handle.
52pub trait ProtocolHandle: Debug + Send + Sync {
53	/// Disconnect peer.
54	fn disconnect_peer(&self, peer_id: crate::types::PeerId);
55}
56
57/// Trait providing peer reputation management and connection candidates.
58pub trait PeerStoreProvider: Debug + Send + Sync {
59	/// Check whether the peer is banned.
60	fn is_banned(&self, peer_id: &crate::types::PeerId) -> bool;
61
62	/// Register a protocol handle to disconnect peers whose reputation drops below the threshold.
63	fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>);
64
65	/// Report peer disconnection for reputation adjustment.
66	fn report_disconnect(&self, peer_id: crate::types::PeerId);
67
68	/// Adjust peer reputation.
69	fn report_peer(&self, peer_id: crate::types::PeerId, change: ReputationChange);
70
71	/// Set peer role.
72	fn set_peer_role(&self, peer_id: &crate::types::PeerId, role: ObservedRole);
73
74	/// Get peer reputation.
75	fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32;
76
77	/// Get peer role, if available.
78	fn peer_role(&self, peer_id: &crate::types::PeerId) -> Option<ObservedRole>;
79
80	/// Get candidates with highest reputations for initiating outgoing connections.
81	fn outgoing_candidates(
82		&self,
83		count: usize,
84		ignored: HashSet<crate::types::PeerId>,
85	) -> Vec<crate::types::PeerId>;
86
87	/// Add known peer.
88	fn add_known_peer(&self, peer_id: crate::types::PeerId);
89}
90
91/// Actual implementation of peer reputations and connection candidates provider.
92#[derive(Debug, Clone)]
93pub struct PeerStoreHandle {
94	inner: Arc<Mutex<PeerStoreInner>>,
95}
96
97impl PeerStoreProvider for PeerStoreHandle {
98	fn is_banned(&self, peer_id: &crate::types::PeerId) -> bool {
99		self.inner.lock().is_banned(&peer_id.into())
100	}
101
102	fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>) {
103		self.inner.lock().register_protocol(protocol_handle);
104	}
105
106	fn report_disconnect(&self, peer_id: crate::types::PeerId) {
107		let mut inner = self.inner.lock();
108		inner.report_disconnect(peer_id.into())
109	}
110
111	fn report_peer(&self, peer_id: crate::types::PeerId, change: ReputationChange) {
112		let mut inner = self.inner.lock();
113		inner.report_peer(peer_id.into(), change)
114	}
115
116	fn set_peer_role(&self, peer_id: &crate::types::PeerId, role: ObservedRole) {
117		let mut inner = self.inner.lock();
118		inner.set_peer_role(&peer_id.into(), role)
119	}
120
121	fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32 {
122		self.inner.lock().peer_reputation(&peer_id.into())
123	}
124
125	fn peer_role(&self, peer_id: &crate::types::PeerId) -> Option<ObservedRole> {
126		self.inner.lock().peer_role(&peer_id.into())
127	}
128
129	fn outgoing_candidates(
130		&self,
131		count: usize,
132		ignored: HashSet<crate::types::PeerId>,
133	) -> Vec<crate::types::PeerId> {
134		self.inner
135			.lock()
136			.outgoing_candidates(count, ignored.iter().map(|peer_id| (*peer_id).into()).collect())
137			.iter()
138			.map(|peer_id| peer_id.into())
139			.collect()
140	}
141
142	fn add_known_peer(&self, peer_id: crate::types::PeerId) {
143		self.inner.lock().add_known_peer(peer_id.into());
144	}
145}
146
147#[derive(Debug, Clone, Copy)]
148struct PeerInfo {
149	/// Reputation of the peer.
150	reputation: i32,
151
152	/// Instant when the peer was last updated.
153	last_updated: Instant,
154
155	/// Role of the peer, if known.
156	role: Option<ObservedRole>,
157}
158
159impl Default for PeerInfo {
160	fn default() -> Self {
161		Self { reputation: 0, last_updated: Instant::now(), role: None }
162	}
163}
164
165impl PartialEq for PeerInfo {
166	fn eq(&self, other: &Self) -> bool {
167		self.reputation == other.reputation
168	}
169}
170
171impl Eq for PeerInfo {}
172
173impl Ord for PeerInfo {
174	// We define reverse order by reputation values.
175	fn cmp(&self, other: &Self) -> Ordering {
176		self.reputation.cmp(&other.reputation).reverse()
177	}
178}
179
180impl PartialOrd for PeerInfo {
181	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
182		Some(self.cmp(other))
183	}
184}
185
186impl PeerInfo {
187	fn is_banned(&self) -> bool {
188		self.reputation < BANNED_THRESHOLD
189	}
190
191	fn add_reputation(&mut self, increment: i32) {
192		self.reputation = self.reputation.saturating_add(increment);
193		self.bump_last_updated();
194	}
195
196	fn decay_reputation(&mut self, seconds_passed: u64) {
197		// Note that decaying the reputation value happens "on its own",
198		// so we don't do `bump_last_updated()`.
199		for _ in 0..seconds_passed {
200			let mut diff = self.reputation / INVERSE_DECREMENT;
201			if diff == 0 && self.reputation < 0 {
202				diff = -1;
203			} else if diff == 0 && self.reputation > 0 {
204				diff = 1;
205			}
206
207			self.reputation = self.reputation.saturating_sub(diff);
208
209			if self.reputation == 0 {
210				break;
211			}
212		}
213	}
214
215	fn bump_last_updated(&mut self) {
216		self.last_updated = Instant::now();
217	}
218}
219
220#[derive(Debug)]
221struct PeerStoreInner {
222	peers: HashMap<PeerId, PeerInfo>,
223	protocols: Vec<Arc<dyn ProtocolHandle>>,
224	metrics: Option<PeerStoreMetrics>,
225}
226
227impl PeerStoreInner {
228	fn is_banned(&self, peer_id: &PeerId) -> bool {
229		self.peers.get(peer_id).map_or(false, |info| info.is_banned())
230	}
231
232	fn register_protocol(&mut self, protocol_handle: Arc<dyn ProtocolHandle>) {
233		self.protocols.push(protocol_handle);
234	}
235
236	fn report_disconnect(&mut self, peer_id: PeerId) {
237		let peer_info = self.peers.entry(peer_id).or_default();
238		peer_info.add_reputation(DISCONNECT_REPUTATION_CHANGE);
239
240		log::trace!(
241			target: LOG_TARGET,
242			"Peer {} disconnected, reputation: {:+} to {}",
243			peer_id,
244			DISCONNECT_REPUTATION_CHANGE,
245			peer_info.reputation,
246		);
247	}
248
249	fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
250		let peer_info = self.peers.entry(peer_id).or_default();
251		let was_banned = peer_info.is_banned();
252		peer_info.add_reputation(change.value);
253
254		log::trace!(
255			target: LOG_TARGET,
256			"Report {}: {:+} to {}. Reason: {}.",
257			peer_id,
258			change.value,
259			peer_info.reputation,
260			change.reason,
261		);
262
263		if !peer_info.is_banned() {
264			if was_banned {
265				log::info!(
266					target: LOG_TARGET,
267					"Peer {} is now unbanned: {:+} to {}. Reason: {}.",
268					peer_id,
269					change.value,
270					peer_info.reputation,
271					change.reason,
272				);
273			}
274			return;
275		}
276
277		// Peer is currently banned, disconnect it from all protocols.
278		self.protocols.iter().for_each(|handle| handle.disconnect_peer(peer_id.into()));
279
280		// The peer is banned for the first time.
281		if !was_banned {
282			log::warn!(
283				target: LOG_TARGET,
284				"Report {}: {:+} to {}. Reason: {}. Banned, disconnecting.",
285				peer_id,
286				change.value,
287				peer_info.reputation,
288				change.reason,
289			);
290			return;
291		}
292
293		// The peer was already banned and it got another negative report.
294		// This may happen during a batch report.
295		if change.value < 0 {
296			log::debug!(
297				target: LOG_TARGET,
298				"Report {}: {:+} to {}. Reason: {}. Misbehaved during the ban threshold.",
299				peer_id,
300				change.value,
301				peer_info.reputation,
302				change.reason,
303			);
304		}
305	}
306
307	fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
308		log::trace!(target: LOG_TARGET, "Set {peer_id} role to {role:?}");
309
310		match self.peers.entry(*peer_id) {
311			Entry::Occupied(mut entry) => {
312				entry.get_mut().role = Some(role);
313			},
314			Entry::Vacant(entry) => {
315				entry.insert(PeerInfo { role: Some(role), ..Default::default() });
316			},
317		}
318	}
319
320	fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
321		self.peers.get(peer_id).map_or(0, |info| info.reputation)
322	}
323
324	fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole> {
325		self.peers.get(peer_id).map_or(None, |info| info.role)
326	}
327
328	fn outgoing_candidates(&self, count: usize, ignored: HashSet<PeerId>) -> Vec<PeerId> {
329		let mut candidates = self
330			.peers
331			.iter()
332			.filter_map(|(peer_id, info)| {
333				(!info.is_banned() && !ignored.contains(peer_id)).then_some((*peer_id, *info))
334			})
335			.collect::<Vec<_>>();
336		let count = std::cmp::min(count, candidates.len());
337		candidates.partial_sort(count, |(_, info1), (_, info2)| info1.cmp(info2));
338		candidates.iter().take(count).map(|(peer_id, _)| *peer_id).collect()
339
340		// TODO: keep the peers sorted (in a "bi-multi-map"?) to not repeat sorting every time.
341	}
342
343	fn progress_time(&mut self, seconds_passed: u64) {
344		if seconds_passed == 0 {
345			return;
346		}
347
348		// Drive reputation values towards 0.
349		self.peers
350			.iter_mut()
351			.for_each(|(_, info)| info.decay_reputation(seconds_passed));
352
353		// Retain only entries with non-zero reputation values or not expired ones.
354		let now = Instant::now();
355		let mut num_banned_peers: u64 = 0;
356		self.peers.retain(|_, info| {
357			if info.is_banned() {
358				num_banned_peers += 1;
359			}
360
361			info.reputation != 0 || info.last_updated + FORGET_AFTER > now
362		});
363
364		if let Some(metrics) = &self.metrics {
365			metrics.num_discovered.set(self.peers.len() as u64);
366			metrics.num_banned_peers.set(num_banned_peers);
367		}
368	}
369
370	fn add_known_peer(&mut self, peer_id: PeerId) {
371		match self.peers.entry(peer_id) {
372			Entry::Occupied(mut e) => {
373				trace!(
374					target: LOG_TARGET,
375					"Trying to add an already known peer {peer_id}, bumping `last_updated`.",
376				);
377				e.get_mut().bump_last_updated();
378			},
379			Entry::Vacant(e) => {
380				trace!(target: LOG_TARGET, "Adding a new known peer {peer_id}.");
381				e.insert(PeerInfo::default());
382			},
383		}
384	}
385}
386
387/// Worker part of [`PeerStoreHandle`]
388#[derive(Debug)]
389pub struct PeerStore {
390	inner: Arc<Mutex<PeerStoreInner>>,
391}
392
393impl PeerStore {
394	/// Create a new peer store from the list of bootnodes.
395	pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
396		let metrics = if let Some(registry) = &metrics_registry {
397			PeerStoreMetrics::register(registry)
398				.map_err(|err| {
399					log::error!(target: LOG_TARGET, "Failed to register peer set metrics: {}", err);
400					err
401				})
402				.ok()
403		} else {
404			None
405		};
406
407		PeerStore {
408			inner: Arc::new(Mutex::new(PeerStoreInner {
409				peers: bootnodes
410					.into_iter()
411					.map(|peer_id| (peer_id, PeerInfo::default()))
412					.collect(),
413				protocols: Vec::new(),
414				metrics,
415			})),
416		}
417	}
418
419	/// Get `PeerStoreHandle`.
420	pub fn handle(&self) -> PeerStoreHandle {
421		PeerStoreHandle { inner: self.inner.clone() }
422	}
423
424	/// Drive the `PeerStore`, decaying reputation values over time and removing expired entries.
425	pub async fn run(self) {
426		let started = Instant::now();
427		let mut latest_time_update = started;
428
429		loop {
430			let now = Instant::now();
431			// We basically do `(now - self.latest_update).as_secs()`, except that by the way we do
432			// it we know that we're not going to miss seconds because of rounding to integers.
433			let seconds_passed = {
434				let elapsed_latest = latest_time_update - started;
435				let elapsed_now = now - started;
436				latest_time_update = now;
437				elapsed_now.as_secs() - elapsed_latest.as_secs()
438			};
439
440			self.inner.lock().progress_time(seconds_passed);
441			let _ = Delay::new(Duration::from_secs(1)).await;
442		}
443	}
444}
445
446#[async_trait::async_trait]
447impl PeerStoreT for PeerStore {
448	fn handle(&self) -> Arc<dyn PeerStoreProvider> {
449		Arc::new(self.handle())
450	}
451
452	async fn run(self) {
453		self.run().await;
454	}
455}
456
457#[cfg(test)]
458mod tests {
459	use super::{PeerInfo, PeerStore, PeerStoreProvider};
460
461	#[test]
462	fn decaying_zero_reputation_yields_zero() {
463		let mut peer_info = PeerInfo::default();
464		assert_eq!(peer_info.reputation, 0);
465
466		peer_info.decay_reputation(1);
467		assert_eq!(peer_info.reputation, 0);
468
469		peer_info.decay_reputation(100_000);
470		assert_eq!(peer_info.reputation, 0);
471	}
472
473	#[test]
474	fn decaying_positive_reputation_decreases_it() {
475		const INITIAL_REPUTATION: i32 = 100;
476
477		let mut peer_info = PeerInfo::default();
478		peer_info.reputation = INITIAL_REPUTATION;
479
480		peer_info.decay_reputation(1);
481		assert!(peer_info.reputation >= 0);
482		assert!(peer_info.reputation < INITIAL_REPUTATION);
483	}
484
485	#[test]
486	fn decaying_negative_reputation_increases_it() {
487		const INITIAL_REPUTATION: i32 = -100;
488
489		let mut peer_info = PeerInfo::default();
490		peer_info.reputation = INITIAL_REPUTATION;
491
492		peer_info.decay_reputation(1);
493		assert!(peer_info.reputation <= 0);
494		assert!(peer_info.reputation > INITIAL_REPUTATION);
495	}
496
497	#[test]
498	fn decaying_max_reputation_finally_yields_zero() {
499		const INITIAL_REPUTATION: i32 = i32::MAX;
500		const SECONDS: u64 = 3544;
501
502		let mut peer_info = PeerInfo::default();
503		peer_info.reputation = INITIAL_REPUTATION;
504
505		peer_info.decay_reputation(SECONDS / 2);
506		assert!(peer_info.reputation > 0);
507
508		peer_info.decay_reputation(SECONDS / 2);
509		assert_eq!(peer_info.reputation, 0);
510	}
511
512	#[test]
513	fn decaying_min_reputation_finally_yields_zero() {
514		const INITIAL_REPUTATION: i32 = i32::MIN;
515		const SECONDS: u64 = 3544;
516
517		let mut peer_info = PeerInfo::default();
518		peer_info.reputation = INITIAL_REPUTATION;
519
520		peer_info.decay_reputation(SECONDS / 2);
521		assert!(peer_info.reputation < 0);
522
523		peer_info.decay_reputation(SECONDS / 2);
524		assert_eq!(peer_info.reputation, 0);
525	}
526
527	#[test]
528	fn report_banned_peers() {
529		let peer_a = crate::types::PeerId::random();
530		let peer_b = crate::types::PeerId::random();
531		let peer_c = crate::types::PeerId::random();
532
533		let metrics_registry = soil_prometheus::Registry::new();
534		let peerstore = PeerStore::new(
535			vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
536			Some(metrics_registry),
537		);
538		let metrics = peerstore.inner.lock().metrics.as_ref().unwrap().clone();
539		let handle = peerstore.handle();
540
541		// Check initial state. Advance time to propagate peers.
542		handle.inner.lock().progress_time(1);
543		assert_eq!(metrics.num_discovered.get(), 3);
544		assert_eq!(metrics.num_banned_peers.get(), 0);
545
546		// Report 2 peers with a negative reputation.
547		handle.report_peer(
548			peer_a,
549			crate::common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
550		);
551		handle.report_peer(
552			peer_b,
553			crate::common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
554		);
555
556		// Advance time to propagate banned peers.
557		handle.inner.lock().progress_time(1);
558		assert_eq!(metrics.num_discovered.get(), 3);
559		assert_eq!(metrics.num_banned_peers.get(), 2);
560	}
561}