1mod peersstate;
36
37use futures::{channel::oneshot, prelude::*};
38use log::{debug, error, trace};
39use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
40use serde_json::json;
41use std::{
42 collections::{HashMap, HashSet, VecDeque},
43 pin::Pin,
44 task::{Context, Poll},
45 time::{Duration, Instant},
46};
47use wasm_timer::Delay;
48
49pub use libp2p::PeerId;
50
51pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100);
53const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
55const FORGET_AFTER: Duration = Duration::from_secs(3600);
58
59#[derive(Debug)]
60enum Action {
61 AddReservedPeer(SetId, PeerId),
62 RemoveReservedPeer(SetId, PeerId),
63 SetReservedPeers(SetId, HashSet<PeerId>),
64 SetReservedOnly(SetId, bool),
65 ReportPeer(PeerId, ReputationChange),
66 AddToPeersSet(SetId, PeerId),
67 RemoveFromPeersSet(SetId, PeerId),
68 PeerReputation(PeerId, oneshot::Sender<i32>),
69}
70
71#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
78pub struct SetId(usize);
79
80impl SetId {
81 pub const fn from(id: usize) -> Self {
82 Self(id)
83 }
84}
85
86impl From<usize> for SetId {
87 fn from(id: usize) -> Self {
88 Self(id)
89 }
90}
91
92impl From<SetId> for usize {
93 fn from(id: SetId) -> Self {
94 id.0
95 }
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub struct ReputationChange {
101 pub value: i32,
103 pub reason: &'static str,
105}
106
107impl ReputationChange {
108 pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
110 Self { value, reason }
111 }
112
113 pub const fn new_fatal(reason: &'static str) -> ReputationChange {
115 Self { value: i32::MIN, reason }
116 }
117}
118
119#[derive(Debug, Clone)]
121pub struct PeersetHandle {
122 tx: TracingUnboundedSender<Action>,
123}
124
125impl PeersetHandle {
126 pub fn add_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
134 let _ = self.tx.unbounded_send(Action::AddReservedPeer(set_id, peer_id));
135 }
136
137 pub fn remove_reserved_peer(&self, set_id: SetId, peer_id: PeerId) {
141 let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(set_id, peer_id));
142 }
143
144 pub fn set_reserved_only(&self, set_id: SetId, reserved: bool) {
147 let _ = self.tx.unbounded_send(Action::SetReservedOnly(set_id, reserved));
148 }
149
150 pub fn set_reserved_peers(&self, set_id: SetId, peer_ids: HashSet<PeerId>) {
152 let _ = self.tx.unbounded_send(Action::SetReservedPeers(set_id, peer_ids));
153 }
154
155 pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) {
157 let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
158 }
159
160 pub fn add_to_peers_set(&self, set_id: SetId, peer_id: PeerId) {
162 let _ = self.tx.unbounded_send(Action::AddToPeersSet(set_id, peer_id));
163 }
164
165 pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) {
167 let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id));
168 }
169
170 pub async fn peer_reputation(self, peer_id: PeerId) -> Result<i32, ()> {
172 let (tx, rx) = oneshot::channel();
173
174 let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx));
175
176 rx.await.map_err(|_| ())
178 }
179}
180
181#[derive(Debug, PartialEq)]
183pub enum Message {
184 Connect {
187 set_id: SetId,
188 peer_id: PeerId,
190 },
191
192 Drop {
194 set_id: SetId,
195 peer_id: PeerId,
197 },
198
199 Accept(IncomingIndex),
201
202 Reject(IncomingIndex),
204}
205
206#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
208pub struct IncomingIndex(pub u64);
209
210impl From<u64> for IncomingIndex {
211 fn from(val: u64) -> Self {
212 Self(val)
213 }
214}
215
216#[derive(Debug)]
218pub struct PeersetConfig {
219 pub sets: Vec<SetConfig>,
221}
222
223#[derive(Debug)]
225pub struct SetConfig {
226 pub in_peers: u32,
228
229 pub out_peers: u32,
231
232 pub bootnodes: Vec<PeerId>,
237
238 pub reserved_nodes: HashSet<PeerId>,
243
244 pub reserved_only: bool,
246}
247
248#[derive(Debug)]
253pub struct Peerset {
254 data: peersstate::PeersState,
256 reserved_nodes: Vec<(HashSet<PeerId>, bool)>,
260 rx: TracingUnboundedReceiver<Action>,
262 tx: TracingUnboundedSender<Action>,
264 message_queue: VecDeque<Message>,
266 created: Instant,
268 latest_time_update: Instant,
270 next_periodic_alloc_slots: Delay,
273}
274
275impl Peerset {
276 pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) {
278 let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000);
279
280 let handle = PeersetHandle { tx: tx.clone() };
281
282 let mut peerset = {
283 let now = Instant::now();
284
285 Self {
286 data: peersstate::PeersState::new(config.sets.iter().map(|set| {
287 peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers }
288 })),
289 tx,
290 rx,
291 reserved_nodes: config
292 .sets
293 .iter()
294 .map(|set| (set.reserved_nodes.clone(), set.reserved_only))
295 .collect(),
296 message_queue: VecDeque::new(),
297 created: now,
298 latest_time_update: now,
299 next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)),
300 }
301 };
302
303 for (set, set_config) in config.sets.into_iter().enumerate() {
304 for node in set_config.reserved_nodes {
305 peerset.data.add_no_slot_node(set, node);
306 }
307
308 for peer_id in set_config.bootnodes {
309 if let peersstate::Peer::Unknown(entry) = peerset.data.peer(set, &peer_id) {
310 entry.discover();
311 } else {
312 debug!(target: "peerset", "Duplicate bootnode in config: {:?}", peer_id);
313 }
314 }
315 }
316
317 for set_index in 0..peerset.data.num_sets() {
318 peerset.alloc_slots(SetId(set_index));
319 }
320
321 (peerset, handle)
322 }
323
324 fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
325 let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id);
326 if !newly_inserted {
327 return
328 }
329
330 self.data.add_no_slot_node(set_id.0, peer_id);
331 self.alloc_slots(set_id);
332 }
333
334 fn on_remove_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
335 if !self.reserved_nodes[set_id.0].0.remove(&peer_id) {
336 return
337 }
338
339 self.data.remove_no_slot_node(set_id.0, &peer_id);
340
341 if !self.reserved_nodes[set_id.0].1 {
343 return
344 }
345
346 if let peersstate::Peer::Connected(peer) = self.data.peer(set_id.0, &peer_id) {
349 peer.disconnect();
350 self.message_queue.push_back(Message::Drop { set_id, peer_id });
351 }
352 }
353
354 fn on_set_reserved_peers(&mut self, set_id: SetId, peer_ids: HashSet<PeerId>) {
355 let (to_insert, to_remove) = {
357 let to_insert = peer_ids
358 .difference(&self.reserved_nodes[set_id.0].0)
359 .cloned()
360 .collect::<Vec<_>>();
361 let to_remove = self.reserved_nodes[set_id.0]
362 .0
363 .difference(&peer_ids)
364 .cloned()
365 .collect::<Vec<_>>();
366 (to_insert, to_remove)
367 };
368
369 for node in to_insert {
370 self.on_add_reserved_peer(set_id, node);
371 }
372
373 for node in to_remove {
374 self.on_remove_reserved_peer(set_id, node);
375 }
376 }
377
378 fn on_set_reserved_only(&mut self, set_id: SetId, reserved_only: bool) {
379 self.reserved_nodes[set_id.0].1 = reserved_only;
380
381 if reserved_only {
382 for peer_id in
384 self.data.connected_peers(set_id.0).cloned().collect::<Vec<_>>().into_iter()
385 {
386 if self.reserved_nodes[set_id.0].0.contains(&peer_id) {
387 continue
388 }
389
390 let peer = self.data.peer(set_id.0, &peer_id).into_connected().expect(
391 "We are enumerating connected peers, therefore the peer is connected; qed",
392 );
393 peer.disconnect();
394 self.message_queue.push_back(Message::Drop { set_id, peer_id });
395 }
396 } else {
397 self.alloc_slots(set_id);
398 }
399 }
400
401 pub fn reserved_peers(&self, set_id: SetId) -> impl Iterator<Item = &PeerId> {
403 self.reserved_nodes[set_id.0].0.iter()
404 }
405
406 pub fn add_to_peers_set(&mut self, set_id: SetId, peer_id: PeerId) {
411 if let peersstate::Peer::Unknown(entry) = self.data.peer(set_id.0, &peer_id) {
412 entry.discover();
413 self.alloc_slots(set_id);
414 }
415 }
416
417 fn on_remove_from_peers_set(&mut self, set_id: SetId, peer_id: PeerId) {
418 if self.reserved_nodes[set_id.0].0.contains(&peer_id) {
420 return
421 }
422
423 match self.data.peer(set_id.0, &peer_id) {
424 peersstate::Peer::Connected(peer) => {
425 self.message_queue.push_back(Message::Drop { set_id, peer_id: *peer.peer_id() });
426 peer.disconnect().forget_peer();
427 },
428 peersstate::Peer::NotConnected(peer) => {
429 peer.forget_peer();
430 },
431 peersstate::Peer::Unknown(_) => {},
432 }
433 }
434
435 fn on_report_peer(&mut self, peer_id: PeerId, change: ReputationChange) {
436 self.update_time();
438
439 let mut reputation = self.data.peer_reputation(peer_id);
440 reputation.add_reputation(change.value);
441 if reputation.reputation() >= BANNED_THRESHOLD {
442 trace!(target: "peerset", "Report {}: {:+} to {}. Reason: {}",
443 peer_id, change.value, reputation.reputation(), change.reason
444 );
445 return
446 }
447
448 debug!(target: "peerset", "Report {}: {:+} to {}. Reason: {}, Disconnecting",
449 peer_id, change.value, reputation.reputation(), change.reason
450 );
451
452 drop(reputation);
453
454 for set_index in 0..self.data.num_sets() {
455 if let peersstate::Peer::Connected(peer) = self.data.peer(set_index, &peer_id) {
456 let peer = peer.disconnect();
457 self.message_queue.push_back(Message::Drop {
458 set_id: SetId(set_index),
459 peer_id: peer.into_peer_id(),
460 });
461
462 self.alloc_slots(SetId(set_index));
463 }
464 }
465 }
466
467 fn on_peer_reputation(&mut self, peer_id: PeerId, pending_response: oneshot::Sender<i32>) {
468 let reputation = self.data.peer_reputation(peer_id);
469 let _ = pending_response.send(reputation.reputation());
470 }
471
472 fn update_time(&mut self) {
475 let now = Instant::now();
476
477 let secs_diff = {
480 let elapsed_latest = self.latest_time_update - self.created;
481 let elapsed_now = now - self.created;
482 self.latest_time_update = now;
483 elapsed_now.as_secs() - elapsed_latest.as_secs()
484 };
485
486 for _ in 0..secs_diff {
491 for peer_id in self.data.peers().cloned().collect::<Vec<_>>() {
492 fn reput_tick(reput: i32) -> i32 {
495 let mut diff = reput / 50;
496 if diff == 0 && reput < 0 {
497 diff = -1;
498 } else if diff == 0 && reput > 0 {
499 diff = 1;
500 }
501 reput.saturating_sub(diff)
502 }
503
504 let mut peer_reputation = self.data.peer_reputation(peer_id);
505
506 let before = peer_reputation.reputation();
507 let after = reput_tick(before);
508 trace!(target: "peerset", "Fleeting {}: {} -> {}", peer_id, before, after);
509 peer_reputation.set_reputation(after);
510
511 if after != 0 {
512 continue
513 }
514
515 drop(peer_reputation);
516
517 for set_index in 0..self.data.num_sets() {
520 match self.data.peer(set_index, &peer_id) {
521 peersstate::Peer::Connected(_) => {},
522 peersstate::Peer::NotConnected(peer) => {
523 if peer.last_connected_or_discovered() + FORGET_AFTER < now {
524 peer.forget_peer();
525 }
526 },
527 peersstate::Peer::Unknown(_) => {
528 },
530 }
531 }
532 }
533 }
534 }
535
536 fn alloc_slots(&mut self, set_id: SetId) {
538 self.update_time();
539
540 for reserved_node in &self.reserved_nodes[set_id.0].0 {
542 let entry = match self.data.peer(set_id.0, reserved_node) {
543 peersstate::Peer::Unknown(n) => n.discover(),
544 peersstate::Peer::NotConnected(n) => n,
545 peersstate::Peer::Connected(_) => continue,
546 };
547
548 if entry.reputation() < BANNED_THRESHOLD {
553 break
554 }
555
556 match entry.try_outgoing() {
557 Ok(conn) => self
558 .message_queue
559 .push_back(Message::Connect { set_id, peer_id: conn.into_peer_id() }),
560 Err(_) => {
561 debug_assert!(false);
565 log::error!(
566 target: "peerset",
567 "Not enough slots to connect to reserved node"
568 );
569 },
570 }
571 }
572
573 if self.reserved_nodes[set_id.0].1 {
577 return
578 }
579
580 while self.data.has_free_outgoing_slot(set_id.0) {
584 let next = match self.data.highest_not_connected_peer(set_id.0) {
585 Some(n) => n,
586 None => break,
587 };
588
589 if next.reputation() < BANNED_THRESHOLD {
591 break
592 }
593
594 match next.try_outgoing() {
595 Ok(conn) => self
596 .message_queue
597 .push_back(Message::Connect { set_id, peer_id: conn.into_peer_id() }),
598 Err(_) => {
599 debug_assert!(false);
602 break
603 },
604 }
605 }
606 }
607
608 pub fn incoming(&mut self, set_id: SetId, peer_id: PeerId, index: IncomingIndex) {
618 trace!(target: "peerset", "Incoming {:?}", peer_id);
619
620 self.update_time();
621
622 if self.reserved_nodes[set_id.0].1 && !self.reserved_nodes[set_id.0].0.contains(&peer_id) {
623 self.message_queue.push_back(Message::Reject(index));
624 return
625 }
626
627 let not_connected = match self.data.peer(set_id.0, &peer_id) {
628 peersstate::Peer::Connected(_) => return,
630 peersstate::Peer::NotConnected(mut entry) => {
631 entry.bump_last_connected_or_discovered();
632 entry
633 },
634 peersstate::Peer::Unknown(entry) => entry.discover(),
635 };
636
637 if not_connected.reputation() < BANNED_THRESHOLD {
638 self.message_queue.push_back(Message::Reject(index));
639 return
640 }
641
642 match not_connected.try_accept_incoming() {
643 Ok(_) => self.message_queue.push_back(Message::Accept(index)),
644 Err(_) => self.message_queue.push_back(Message::Reject(index)),
645 }
646 }
647
648 pub fn dropped(&mut self, set_id: SetId, peer_id: PeerId, reason: DropReason) {
653 self.update_time();
655
656 match self.data.peer(set_id.0, &peer_id) {
657 peersstate::Peer::Connected(mut entry) => {
658 entry.add_reputation(DISCONNECT_REPUTATION_CHANGE);
660 trace!(target: "peerset", "Dropping {}: {:+} to {}",
661 peer_id, DISCONNECT_REPUTATION_CHANGE, entry.reputation());
662 entry.disconnect();
663 },
664 peersstate::Peer::NotConnected(_) | peersstate::Peer::Unknown(_) => {
665 error!(target: "peerset", "Received dropped() for non-connected node")
666 },
667 }
668
669 if let DropReason::Refused = reason {
670 self.on_remove_from_peers_set(set_id, peer_id);
671 }
672
673 self.alloc_slots(set_id);
674 }
675
676 pub fn report_peer(&mut self, peer_id: PeerId, score_diff: ReputationChange) {
678 let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff));
682 }
683
684 pub fn debug_info(&mut self) -> serde_json::Value {
686 self.update_time();
687
688 json!({
689 "sets": (0..self.data.num_sets()).map(|set_index| {
690 json!({
691 "nodes": self.data.peers().cloned().collect::<Vec<_>>().into_iter().filter_map(|peer_id| {
692 let state = match self.data.peer(set_index, &peer_id) {
693 peersstate::Peer::Connected(entry) => json!({
694 "connected": true,
695 "reputation": entry.reputation()
696 }),
697 peersstate::Peer::NotConnected(entry) => json!({
698 "connected": false,
699 "reputation": entry.reputation()
700 }),
701 peersstate::Peer::Unknown(_) => return None,
702 };
703
704 Some((peer_id.to_base58(), state))
705 }).collect::<HashMap<_, _>>(),
706 "reserved_nodes": self.reserved_nodes[set_index].0.iter().map(|peer_id| {
707 peer_id.to_base58()
708 }).collect::<HashSet<_>>(),
709 "reserved_only": self.reserved_nodes[set_index].1,
710 })
711 }).collect::<Vec<_>>(),
712 "message_queue": self.message_queue.len(),
713 })
714 }
715
716 pub fn num_discovered_peers(&self) -> usize {
718 self.data.peers().len()
719 }
720}
721
722impl Stream for Peerset {
723 type Item = Message;
724
725 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
726 loop {
727 if let Some(message) = self.message_queue.pop_front() {
728 return Poll::Ready(Some(message))
729 }
730
731 if Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx).is_ready() {
732 self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0));
733
734 for set_index in 0..self.data.num_sets() {
735 self.alloc_slots(SetId(set_index));
736 }
737 }
738
739 let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
740 Poll::Pending => return Poll::Pending,
741 Poll::Ready(Some(event)) => event,
742 Poll::Ready(None) => return Poll::Pending,
743 };
744
745 match action {
746 Action::AddReservedPeer(set_id, peer_id) =>
747 self.on_add_reserved_peer(set_id, peer_id),
748 Action::RemoveReservedPeer(set_id, peer_id) =>
749 self.on_remove_reserved_peer(set_id, peer_id),
750 Action::SetReservedPeers(set_id, peer_ids) =>
751 self.on_set_reserved_peers(set_id, peer_ids),
752 Action::SetReservedOnly(set_id, reserved) =>
753 self.on_set_reserved_only(set_id, reserved),
754 Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff),
755 Action::AddToPeersSet(sets_name, peer_id) =>
756 self.add_to_peers_set(sets_name, peer_id),
757 Action::RemoveFromPeersSet(sets_name, peer_id) =>
758 self.on_remove_from_peers_set(sets_name, peer_id),
759 Action::PeerReputation(peer_id, pending_response) =>
760 self.on_peer_reputation(peer_id, pending_response),
761 }
762 }
763 }
764}
765
766pub enum DropReason {
768 Unknown,
770 Refused,
775}
776
777#[cfg(test)]
778mod tests {
779 use super::{
780 IncomingIndex, Message, Peerset, PeersetConfig, ReputationChange, SetConfig, SetId,
781 BANNED_THRESHOLD,
782 };
783 use futures::prelude::*;
784 use libp2p::PeerId;
785 use std::{pin::Pin, task::Poll, thread, time::Duration};
786
787 fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
788 for expected_message in messages {
789 let (message, p) = next_message(peerset).expect("expected message");
790 assert_eq!(message, expected_message);
791 peerset = p;
792 }
793 peerset
794 }
795
796 fn next_message(mut peerset: Peerset) -> Result<(Message, Peerset), ()> {
797 let next = futures::executor::block_on_stream(&mut peerset).next();
798 let message = next.ok_or(())?;
799 Ok((message, peerset))
800 }
801
802 #[test]
803 fn test_peerset_add_reserved_peer() {
804 let bootnode = PeerId::random();
805 let reserved_peer = PeerId::random();
806 let reserved_peer2 = PeerId::random();
807 let config = PeersetConfig {
808 sets: vec![SetConfig {
809 in_peers: 0,
810 out_peers: 2,
811 bootnodes: vec![bootnode],
812 reserved_nodes: Default::default(),
813 reserved_only: true,
814 }],
815 };
816
817 let (peerset, handle) = Peerset::from_config(config);
818 handle.add_reserved_peer(SetId::from(0), reserved_peer);
819 handle.add_reserved_peer(SetId::from(0), reserved_peer2);
820
821 assert_messages(
822 peerset,
823 vec![
824 Message::Connect { set_id: SetId::from(0), peer_id: reserved_peer },
825 Message::Connect { set_id: SetId::from(0), peer_id: reserved_peer2 },
826 ],
827 );
828 }
829
830 #[test]
831 fn test_peerset_incoming() {
832 let bootnode = PeerId::random();
833 let incoming = PeerId::random();
834 let incoming2 = PeerId::random();
835 let incoming3 = PeerId::random();
836 let ii = IncomingIndex(1);
837 let ii2 = IncomingIndex(2);
838 let ii3 = IncomingIndex(3);
839 let ii4 = IncomingIndex(3);
840 let config = PeersetConfig {
841 sets: vec![SetConfig {
842 in_peers: 2,
843 out_peers: 1,
844 bootnodes: vec![bootnode],
845 reserved_nodes: Default::default(),
846 reserved_only: false,
847 }],
848 };
849
850 let (mut peerset, _handle) = Peerset::from_config(config);
851 peerset.incoming(SetId::from(0), incoming, ii);
852 peerset.incoming(SetId::from(0), incoming, ii4);
853 peerset.incoming(SetId::from(0), incoming2, ii2);
854 peerset.incoming(SetId::from(0), incoming3, ii3);
855
856 assert_messages(
857 peerset,
858 vec![
859 Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
860 Message::Accept(ii),
861 Message::Accept(ii2),
862 Message::Reject(ii3),
863 ],
864 );
865 }
866
867 #[test]
868 fn test_peerset_reject_incoming_in_reserved_only() {
869 let incoming = PeerId::random();
870 let ii = IncomingIndex(1);
871 let config = PeersetConfig {
872 sets: vec![SetConfig {
873 in_peers: 50,
874 out_peers: 50,
875 bootnodes: vec![],
876 reserved_nodes: Default::default(),
877 reserved_only: true,
878 }],
879 };
880
881 let (mut peerset, _) = Peerset::from_config(config);
882 peerset.incoming(SetId::from(0), incoming, ii);
883
884 assert_messages(peerset, vec![Message::Reject(ii)]);
885 }
886
887 #[test]
888 fn test_peerset_discovered() {
889 let bootnode = PeerId::random();
890 let discovered = PeerId::random();
891 let discovered2 = PeerId::random();
892 let config = PeersetConfig {
893 sets: vec![SetConfig {
894 in_peers: 0,
895 out_peers: 2,
896 bootnodes: vec![bootnode],
897 reserved_nodes: Default::default(),
898 reserved_only: false,
899 }],
900 };
901
902 let (mut peerset, _handle) = Peerset::from_config(config);
903 peerset.add_to_peers_set(SetId::from(0), discovered);
904 peerset.add_to_peers_set(SetId::from(0), discovered);
905 peerset.add_to_peers_set(SetId::from(0), discovered2);
906
907 assert_messages(
908 peerset,
909 vec![
910 Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
911 Message::Connect { set_id: SetId::from(0), peer_id: discovered },
912 ],
913 );
914 }
915
916 #[test]
917 fn test_peerset_banned() {
918 let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
919 sets: vec![SetConfig {
920 in_peers: 25,
921 out_peers: 25,
922 bootnodes: vec![],
923 reserved_nodes: Default::default(),
924 reserved_only: false,
925 }],
926 });
927
928 let peer_id = PeerId::random();
930 handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
931
932 let fut = futures::future::poll_fn(move |cx| {
933 assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
935
936 peerset.incoming(SetId::from(0), peer_id, IncomingIndex(1));
938 if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
939 assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
940 } else {
941 panic!()
942 }
943
944 thread::sleep(Duration::from_millis(1500));
946
947 peerset.incoming(SetId::from(0), peer_id, IncomingIndex(2));
949 while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
950 assert_eq!(msg.unwrap(), Message::Accept(IncomingIndex(2)));
951 }
952
953 Poll::Ready(())
954 });
955
956 futures::executor::block_on(fut);
957 }
958
959 #[test]
960 fn test_relloc_after_banned() {
961 let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
962 sets: vec![SetConfig {
963 in_peers: 25,
964 out_peers: 25,
965 bootnodes: vec![],
966 reserved_nodes: Default::default(),
967 reserved_only: false,
968 }],
969 });
970
971 let peer_id = PeerId::random();
973 handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));
974
975 let fut = futures::future::poll_fn(move |cx| {
976 assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
978
979 peerset.incoming(SetId::from(0), peer_id, IncomingIndex(1));
983 if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
984 assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
985 } else {
986 panic!()
987 }
988
989 while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
991 assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id });
992 }
993
994 Poll::Ready(())
995 });
996
997 futures::executor::block_on(fut);
998 }
999}