1use crate::service::{metrics::PeerStoreMetrics, traits::PeerStore as PeerStoreT};
11
12use crate::common::{role::ObservedRole, types::ReputationChange};
13use libp2p::PeerId;
14use log::trace;
15use parking_lot::Mutex;
16use partial_sort::PartialSort;
17use soil_prometheus::Registry;
18use std::{
19 cmp::{Ord, Ordering, PartialOrd},
20 collections::{hash_map::Entry, HashMap, HashSet},
21 fmt::Debug,
22 sync::Arc,
23 time::{Duration, Instant},
24};
25use wasm_timer::Delay;
26
27pub const LOG_TARGET: &str = "peerset";
29
30pub const BANNED_THRESHOLD: i32 = 71 * (i32::MIN / 100);
32const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
34const INVERSE_DECREMENT: i32 = 200;
47const FORGET_AFTER: Duration = Duration::from_secs(3600);
50
51pub trait ProtocolHandle: Debug + Send + Sync {
53 fn disconnect_peer(&self, peer_id: crate::types::PeerId);
55}
56
57pub trait PeerStoreProvider: Debug + Send + Sync {
59 fn is_banned(&self, peer_id: &crate::types::PeerId) -> bool;
61
62 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>);
64
65 fn report_disconnect(&self, peer_id: crate::types::PeerId);
67
68 fn report_peer(&self, peer_id: crate::types::PeerId, change: ReputationChange);
70
71 fn set_peer_role(&self, peer_id: &crate::types::PeerId, role: ObservedRole);
73
74 fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32;
76
77 fn peer_role(&self, peer_id: &crate::types::PeerId) -> Option<ObservedRole>;
79
80 fn outgoing_candidates(
82 &self,
83 count: usize,
84 ignored: HashSet<crate::types::PeerId>,
85 ) -> Vec<crate::types::PeerId>;
86
87 fn add_known_peer(&self, peer_id: crate::types::PeerId);
89}
90
91#[derive(Debug, Clone)]
93pub struct PeerStoreHandle {
94 inner: Arc<Mutex<PeerStoreInner>>,
95}
96
97impl PeerStoreProvider for PeerStoreHandle {
98 fn is_banned(&self, peer_id: &crate::types::PeerId) -> bool {
99 self.inner.lock().is_banned(&peer_id.into())
100 }
101
102 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandle>) {
103 self.inner.lock().register_protocol(protocol_handle);
104 }
105
106 fn report_disconnect(&self, peer_id: crate::types::PeerId) {
107 let mut inner = self.inner.lock();
108 inner.report_disconnect(peer_id.into())
109 }
110
111 fn report_peer(&self, peer_id: crate::types::PeerId, change: ReputationChange) {
112 let mut inner = self.inner.lock();
113 inner.report_peer(peer_id.into(), change)
114 }
115
116 fn set_peer_role(&self, peer_id: &crate::types::PeerId, role: ObservedRole) {
117 let mut inner = self.inner.lock();
118 inner.set_peer_role(&peer_id.into(), role)
119 }
120
121 fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32 {
122 self.inner.lock().peer_reputation(&peer_id.into())
123 }
124
125 fn peer_role(&self, peer_id: &crate::types::PeerId) -> Option<ObservedRole> {
126 self.inner.lock().peer_role(&peer_id.into())
127 }
128
129 fn outgoing_candidates(
130 &self,
131 count: usize,
132 ignored: HashSet<crate::types::PeerId>,
133 ) -> Vec<crate::types::PeerId> {
134 self.inner
135 .lock()
136 .outgoing_candidates(count, ignored.iter().map(|peer_id| (*peer_id).into()).collect())
137 .iter()
138 .map(|peer_id| peer_id.into())
139 .collect()
140 }
141
142 fn add_known_peer(&self, peer_id: crate::types::PeerId) {
143 self.inner.lock().add_known_peer(peer_id.into());
144 }
145}
146
147#[derive(Debug, Clone, Copy)]
148struct PeerInfo {
149 reputation: i32,
151
152 last_updated: Instant,
154
155 role: Option<ObservedRole>,
157}
158
159impl Default for PeerInfo {
160 fn default() -> Self {
161 Self { reputation: 0, last_updated: Instant::now(), role: None }
162 }
163}
164
165impl PartialEq for PeerInfo {
166 fn eq(&self, other: &Self) -> bool {
167 self.reputation == other.reputation
168 }
169}
170
171impl Eq for PeerInfo {}
172
173impl Ord for PeerInfo {
174 fn cmp(&self, other: &Self) -> Ordering {
176 self.reputation.cmp(&other.reputation).reverse()
177 }
178}
179
180impl PartialOrd for PeerInfo {
181 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
182 Some(self.cmp(other))
183 }
184}
185
186impl PeerInfo {
187 fn is_banned(&self) -> bool {
188 self.reputation < BANNED_THRESHOLD
189 }
190
191 fn add_reputation(&mut self, increment: i32) {
192 self.reputation = self.reputation.saturating_add(increment);
193 self.bump_last_updated();
194 }
195
196 fn decay_reputation(&mut self, seconds_passed: u64) {
197 for _ in 0..seconds_passed {
200 let mut diff = self.reputation / INVERSE_DECREMENT;
201 if diff == 0 && self.reputation < 0 {
202 diff = -1;
203 } else if diff == 0 && self.reputation > 0 {
204 diff = 1;
205 }
206
207 self.reputation = self.reputation.saturating_sub(diff);
208
209 if self.reputation == 0 {
210 break;
211 }
212 }
213 }
214
215 fn bump_last_updated(&mut self) {
216 self.last_updated = Instant::now();
217 }
218}
219
220#[derive(Debug)]
221struct PeerStoreInner {
222 peers: HashMap<PeerId, PeerInfo>,
223 protocols: Vec<Arc<dyn ProtocolHandle>>,
224 metrics: Option<PeerStoreMetrics>,
225}
226
227impl PeerStoreInner {
228 fn is_banned(&self, peer_id: &PeerId) -> bool {
229 self.peers.get(peer_id).map_or(false, |info| info.is_banned())
230 }
231
232 fn register_protocol(&mut self, protocol_handle: Arc<dyn ProtocolHandle>) {
233 self.protocols.push(protocol_handle);
234 }
235
236 fn report_disconnect(&mut self, peer_id: PeerId) {
237 let peer_info = self.peers.entry(peer_id).or_default();
238 peer_info.add_reputation(DISCONNECT_REPUTATION_CHANGE);
239
240 log::trace!(
241 target: LOG_TARGET,
242 "Peer {} disconnected, reputation: {:+} to {}",
243 peer_id,
244 DISCONNECT_REPUTATION_CHANGE,
245 peer_info.reputation,
246 );
247 }
248
249 fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
250 let peer_info = self.peers.entry(peer_id).or_default();
251 let was_banned = peer_info.is_banned();
252 peer_info.add_reputation(change.value);
253
254 log::trace!(
255 target: LOG_TARGET,
256 "Report {}: {:+} to {}. Reason: {}.",
257 peer_id,
258 change.value,
259 peer_info.reputation,
260 change.reason,
261 );
262
263 if !peer_info.is_banned() {
264 if was_banned {
265 log::info!(
266 target: LOG_TARGET,
267 "Peer {} is now unbanned: {:+} to {}. Reason: {}.",
268 peer_id,
269 change.value,
270 peer_info.reputation,
271 change.reason,
272 );
273 }
274 return;
275 }
276
277 self.protocols.iter().for_each(|handle| handle.disconnect_peer(peer_id.into()));
279
280 if !was_banned {
282 log::warn!(
283 target: LOG_TARGET,
284 "Report {}: {:+} to {}. Reason: {}. Banned, disconnecting.",
285 peer_id,
286 change.value,
287 peer_info.reputation,
288 change.reason,
289 );
290 return;
291 }
292
293 if change.value < 0 {
296 log::debug!(
297 target: LOG_TARGET,
298 "Report {}: {:+} to {}. Reason: {}. Misbehaved during the ban threshold.",
299 peer_id,
300 change.value,
301 peer_info.reputation,
302 change.reason,
303 );
304 }
305 }
306
307 fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
308 log::trace!(target: LOG_TARGET, "Set {peer_id} role to {role:?}");
309
310 match self.peers.entry(*peer_id) {
311 Entry::Occupied(mut entry) => {
312 entry.get_mut().role = Some(role);
313 },
314 Entry::Vacant(entry) => {
315 entry.insert(PeerInfo { role: Some(role), ..Default::default() });
316 },
317 }
318 }
319
320 fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
321 self.peers.get(peer_id).map_or(0, |info| info.reputation)
322 }
323
324 fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole> {
325 self.peers.get(peer_id).map_or(None, |info| info.role)
326 }
327
328 fn outgoing_candidates(&self, count: usize, ignored: HashSet<PeerId>) -> Vec<PeerId> {
329 let mut candidates = self
330 .peers
331 .iter()
332 .filter_map(|(peer_id, info)| {
333 (!info.is_banned() && !ignored.contains(peer_id)).then_some((*peer_id, *info))
334 })
335 .collect::<Vec<_>>();
336 let count = std::cmp::min(count, candidates.len());
337 candidates.partial_sort(count, |(_, info1), (_, info2)| info1.cmp(info2));
338 candidates.iter().take(count).map(|(peer_id, _)| *peer_id).collect()
339
340 }
342
343 fn progress_time(&mut self, seconds_passed: u64) {
344 if seconds_passed == 0 {
345 return;
346 }
347
348 self.peers
350 .iter_mut()
351 .for_each(|(_, info)| info.decay_reputation(seconds_passed));
352
353 let now = Instant::now();
355 let mut num_banned_peers: u64 = 0;
356 self.peers.retain(|_, info| {
357 if info.is_banned() {
358 num_banned_peers += 1;
359 }
360
361 info.reputation != 0 || info.last_updated + FORGET_AFTER > now
362 });
363
364 if let Some(metrics) = &self.metrics {
365 metrics.num_discovered.set(self.peers.len() as u64);
366 metrics.num_banned_peers.set(num_banned_peers);
367 }
368 }
369
370 fn add_known_peer(&mut self, peer_id: PeerId) {
371 match self.peers.entry(peer_id) {
372 Entry::Occupied(mut e) => {
373 trace!(
374 target: LOG_TARGET,
375 "Trying to add an already known peer {peer_id}, bumping `last_updated`.",
376 );
377 e.get_mut().bump_last_updated();
378 },
379 Entry::Vacant(e) => {
380 trace!(target: LOG_TARGET, "Adding a new known peer {peer_id}.");
381 e.insert(PeerInfo::default());
382 },
383 }
384 }
385}
386
387#[derive(Debug)]
389pub struct PeerStore {
390 inner: Arc<Mutex<PeerStoreInner>>,
391}
392
393impl PeerStore {
394 pub fn new(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self {
396 let metrics = if let Some(registry) = &metrics_registry {
397 PeerStoreMetrics::register(registry)
398 .map_err(|err| {
399 log::error!(target: LOG_TARGET, "Failed to register peer set metrics: {}", err);
400 err
401 })
402 .ok()
403 } else {
404 None
405 };
406
407 PeerStore {
408 inner: Arc::new(Mutex::new(PeerStoreInner {
409 peers: bootnodes
410 .into_iter()
411 .map(|peer_id| (peer_id, PeerInfo::default()))
412 .collect(),
413 protocols: Vec::new(),
414 metrics,
415 })),
416 }
417 }
418
419 pub fn handle(&self) -> PeerStoreHandle {
421 PeerStoreHandle { inner: self.inner.clone() }
422 }
423
424 pub async fn run(self) {
426 let started = Instant::now();
427 let mut latest_time_update = started;
428
429 loop {
430 let now = Instant::now();
431 let seconds_passed = {
434 let elapsed_latest = latest_time_update - started;
435 let elapsed_now = now - started;
436 latest_time_update = now;
437 elapsed_now.as_secs() - elapsed_latest.as_secs()
438 };
439
440 self.inner.lock().progress_time(seconds_passed);
441 let _ = Delay::new(Duration::from_secs(1)).await;
442 }
443 }
444}
445
446#[async_trait::async_trait]
447impl PeerStoreT for PeerStore {
448 fn handle(&self) -> Arc<dyn PeerStoreProvider> {
449 Arc::new(self.handle())
450 }
451
452 async fn run(self) {
453 self.run().await;
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::{PeerInfo, PeerStore, PeerStoreProvider};
460
461 #[test]
462 fn decaying_zero_reputation_yields_zero() {
463 let mut peer_info = PeerInfo::default();
464 assert_eq!(peer_info.reputation, 0);
465
466 peer_info.decay_reputation(1);
467 assert_eq!(peer_info.reputation, 0);
468
469 peer_info.decay_reputation(100_000);
470 assert_eq!(peer_info.reputation, 0);
471 }
472
473 #[test]
474 fn decaying_positive_reputation_decreases_it() {
475 const INITIAL_REPUTATION: i32 = 100;
476
477 let mut peer_info = PeerInfo::default();
478 peer_info.reputation = INITIAL_REPUTATION;
479
480 peer_info.decay_reputation(1);
481 assert!(peer_info.reputation >= 0);
482 assert!(peer_info.reputation < INITIAL_REPUTATION);
483 }
484
485 #[test]
486 fn decaying_negative_reputation_increases_it() {
487 const INITIAL_REPUTATION: i32 = -100;
488
489 let mut peer_info = PeerInfo::default();
490 peer_info.reputation = INITIAL_REPUTATION;
491
492 peer_info.decay_reputation(1);
493 assert!(peer_info.reputation <= 0);
494 assert!(peer_info.reputation > INITIAL_REPUTATION);
495 }
496
497 #[test]
498 fn decaying_max_reputation_finally_yields_zero() {
499 const INITIAL_REPUTATION: i32 = i32::MAX;
500 const SECONDS: u64 = 3544;
501
502 let mut peer_info = PeerInfo::default();
503 peer_info.reputation = INITIAL_REPUTATION;
504
505 peer_info.decay_reputation(SECONDS / 2);
506 assert!(peer_info.reputation > 0);
507
508 peer_info.decay_reputation(SECONDS / 2);
509 assert_eq!(peer_info.reputation, 0);
510 }
511
512 #[test]
513 fn decaying_min_reputation_finally_yields_zero() {
514 const INITIAL_REPUTATION: i32 = i32::MIN;
515 const SECONDS: u64 = 3544;
516
517 let mut peer_info = PeerInfo::default();
518 peer_info.reputation = INITIAL_REPUTATION;
519
520 peer_info.decay_reputation(SECONDS / 2);
521 assert!(peer_info.reputation < 0);
522
523 peer_info.decay_reputation(SECONDS / 2);
524 assert_eq!(peer_info.reputation, 0);
525 }
526
527 #[test]
528 fn report_banned_peers() {
529 let peer_a = crate::types::PeerId::random();
530 let peer_b = crate::types::PeerId::random();
531 let peer_c = crate::types::PeerId::random();
532
533 let metrics_registry = soil_prometheus::Registry::new();
534 let peerstore = PeerStore::new(
535 vec![peer_a, peer_b, peer_c].into_iter().map(Into::into).collect(),
536 Some(metrics_registry),
537 );
538 let metrics = peerstore.inner.lock().metrics.as_ref().unwrap().clone();
539 let handle = peerstore.handle();
540
541 handle.inner.lock().progress_time(1);
543 assert_eq!(metrics.num_discovered.get(), 3);
544 assert_eq!(metrics.num_banned_peers.get(), 0);
545
546 handle.report_peer(
548 peer_a,
549 crate::common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
550 );
551 handle.report_peer(
552 peer_b,
553 crate::common::types::ReputationChange { value: i32::MIN, reason: "test".into() },
554 );
555
556 handle.inner.lock().progress_time(1);
558 assert_eq!(metrics.num_discovered.get(), 3);
559 assert_eq!(metrics.num_banned_peers.get(), 2);
560 }
561}