1use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
45
46use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
47use libp2p::PeerId;
48use log::{debug, error, trace, warn};
49use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
50use sp_arithmetic::traits::SaturatedConversion;
51use std::{
52 collections::{HashMap, HashSet},
53 sync::Arc,
54 time::{Duration, Instant},
55};
56use wasm_timer::Delay;
57
58pub const LOG_TARGET: &str = "peerset";
60
61#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct SetId(usize);
68
69impl SetId {
70 pub const fn from(id: usize) -> Self {
72 Self(id)
73 }
74}
75
76impl From<usize> for SetId {
77 fn from(id: usize) -> Self {
78 Self(id)
79 }
80}
81
82impl From<SetId> for usize {
83 fn from(id: SetId) -> Self {
84 id.0
85 }
86}
87
88#[derive(Debug)]
90pub struct ProtoSetConfig {
91 pub in_peers: u32,
93
94 pub out_peers: u32,
96
97 pub reserved_nodes: HashSet<PeerId>,
102
103 pub reserved_only: bool,
105}
106
107#[derive(Debug, PartialEq)]
109pub enum Message {
110 Connect {
113 set_id: SetId,
115 peer_id: PeerId,
117 },
118
119 Drop {
121 set_id: SetId,
123 peer_id: PeerId,
125 },
126
127 Accept(IncomingIndex),
129
130 Reject(IncomingIndex),
132}
133
134#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub struct IncomingIndex(pub u64);
137
138impl From<u64> for IncomingIndex {
139 fn from(val: u64) -> Self {
140 Self(val)
141 }
142}
143
144#[derive(Debug)]
146enum Action {
147 AddReservedPeer(PeerId),
149 RemoveReservedPeer(PeerId),
151 SetReservedPeers(HashSet<PeerId>),
153 SetReservedOnly(bool),
155 DisconnectPeer(PeerId),
157 GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
159}
160
161#[derive(Debug)]
163enum Event {
164 IncomingConnection(PeerId, IncomingIndex),
166 Dropped(PeerId),
168}
169
170#[derive(Debug, Clone)]
173pub struct ProtocolHandle {
174 actions_tx: TracingUnboundedSender<Action>,
176 events_tx: TracingUnboundedSender<Event>,
178}
179
180impl ProtocolHandle {
181 pub fn add_reserved_peer(&self, peer_id: PeerId) {
189 let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
190 }
191
192 pub fn remove_reserved_peer(&self, peer_id: PeerId) {
196 let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
197 }
198
199 pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
201 let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
202 }
203
204 pub fn set_reserved_only(&self, reserved: bool) {
207 let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
208 }
209
210 pub fn disconnect_peer(&self, peer_id: PeerId) {
213 let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
214 }
215
216 pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
218 let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
219 }
220
221 pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
223 let _ = self
224 .events_tx
225 .unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
226 }
227
228 pub fn dropped(&self, peer_id: PeerId) {
230 let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
231 }
232}
233
234impl ProtocolHandleT for ProtocolHandle {
235 fn disconnect_peer(&self, peer_id: sc_network_types::PeerId) {
236 let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
237 }
238}
239
240#[derive(Clone, Copy, Debug)]
242enum Direction {
243 Inbound,
244 Outbound,
245}
246
247#[derive(Clone, Debug)]
249enum PeerState {
250 Connected(Direction),
252 NotConnected,
254}
255
256impl PeerState {
257 fn is_connected(&self) -> bool {
259 matches!(self, PeerState::Connected(_))
260 }
261}
262
263impl Default for PeerState {
264 fn default() -> PeerState {
265 PeerState::NotConnected
266 }
267}
268
269#[derive(Debug)]
271pub struct ProtocolController {
272 set_id: SetId,
275 actions_rx: TracingUnboundedReceiver<Action>,
277 events_rx: TracingUnboundedReceiver<Event>,
279 num_in: u32,
281 num_out: u32,
283 max_in: u32,
285 max_out: u32,
287 nodes: HashMap<PeerId, Direction>,
289 reserved_nodes: HashMap<PeerId, PeerState>,
291 reserved_only: bool,
293 next_periodic_alloc_slots: Instant,
295 to_notifications: TracingUnboundedSender<Message>,
297 peer_store: Arc<dyn PeerStoreProvider>,
300}
301
302impl ProtocolController {
303 pub fn new(
305 set_id: SetId,
306 config: ProtoSetConfig,
307 to_notifications: TracingUnboundedSender<Message>,
308 peer_store: Arc<dyn PeerStoreProvider>,
309 ) -> (ProtocolHandle, ProtocolController) {
310 let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
311 let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
312 let handle = ProtocolHandle { actions_tx, events_tx };
313 peer_store.register_protocol(Arc::new(handle.clone()));
314 let reserved_nodes =
315 config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
316 let controller = ProtocolController {
317 set_id,
318 actions_rx,
319 events_rx,
320 num_in: 0,
321 num_out: 0,
322 max_in: config.in_peers,
323 max_out: config.out_peers,
324 nodes: HashMap::new(),
325 reserved_nodes,
326 reserved_only: config.reserved_only,
327 next_periodic_alloc_slots: Instant::now(),
328 to_notifications,
329 peer_store,
330 };
331 (handle, controller)
332 }
333
334 pub async fn run(mut self) {
337 while self.next_action().await {}
338 }
339
340 pub async fn next_action(&mut self) -> bool {
344 let either = loop {
345 let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
346
347 futures::select_biased! {
349 event = self.events_rx.next() => match event {
350 Some(event) => break Either::Left(event),
351 None => return false,
352 },
353 action = self.actions_rx.next() => match action {
354 Some(action) => break Either::Right(action),
355 None => return false,
356 },
357 _ = next_alloc_slots => {
358 self.alloc_slots();
359 self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
360 },
361 }
362 };
363
364 match either {
365 Either::Left(event) => self.process_event(event),
366 Either::Right(action) => self.process_action(action),
367 }
368
369 true
370 }
371
372 fn process_event(&mut self, event: Event) {
374 match event {
375 Event::IncomingConnection(peer_id, index) => {
376 self.on_incoming_connection(peer_id, index)
377 },
378 Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
379 }
380 }
381
382 fn process_action(&mut self, action: Action) {
384 match action {
385 Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
386 Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
387 Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
388 Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
389 Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
390 Action::GetReservedPeers(pending_response) => {
391 self.on_get_reserved_peers(pending_response)
392 },
393 }
394 }
395
396 fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
398 trace!(
399 target: LOG_TARGET,
400 "Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
401 self.set_id,
402 self.num_in,
403 self.max_in,
404 );
405
406 let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
407 }
408
409 fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
411 trace!(
412 target: LOG_TARGET,
413 "Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
414 self.set_id,
415 self.num_in,
416 self.max_in,
417 );
418
419 let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
420 }
421
422 fn start_connection(&mut self, peer_id: PeerId) {
424 trace!(
425 target: LOG_TARGET,
426 "Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
427 self.set_id,
428 self.num_out,
429 self.max_out,
430 );
431
432 let _ = self
433 .to_notifications
434 .unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
435 }
436
437 fn drop_connection(&mut self, peer_id: PeerId) {
439 trace!(
440 target: LOG_TARGET,
441 "Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
442 self.set_id,
443 self.num_in,
444 self.max_in,
445 self.num_out,
446 self.max_out,
447 );
448
449 let _ = self
450 .to_notifications
451 .unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
452 }
453
454 fn report_disconnect(&mut self, peer_id: PeerId) {
457 self.peer_store.report_disconnect(peer_id.into());
458 }
459
460 fn is_banned(&self, peer_id: &PeerId) -> bool {
462 self.peer_store.is_banned(&peer_id.into())
463 }
464
465 fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
468 if self.reserved_nodes.contains_key(&peer_id) {
469 debug!(
470 target: LOG_TARGET,
471 "Trying to add an already reserved node {peer_id} as reserved on {:?}.",
472 self.set_id,
473 );
474 return;
475 }
476
477 let state = match self.nodes.remove(&peer_id) {
479 Some(direction) => {
480 trace!(
481 target: LOG_TARGET,
482 "Marking previously connected node {} ({:?}) as reserved on {:?}.",
483 peer_id,
484 direction,
485 self.set_id
486 );
487 PeerState::Connected(direction)
488 },
489 None => {
490 trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
491 PeerState::NotConnected
492 },
493 };
494
495 self.reserved_nodes.insert(peer_id, state.clone());
496
497 match state {
499 PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
500 PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
501 PeerState::NotConnected => self.alloc_slots(),
502 }
503 }
504
505 fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
508 let state = match self.reserved_nodes.remove(&peer_id) {
509 Some(state) => state,
510 None => {
511 warn!(
512 target: LOG_TARGET,
513 "Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
514 );
515 return;
516 },
517 };
518
519 if let PeerState::Connected(direction) = state {
520 let disconnect = self.reserved_only ||
522 match direction {
523 Direction::Inbound => self.num_in >= self.max_in,
524 Direction::Outbound => self.num_out >= self.max_out,
525 };
526
527 if disconnect {
528 trace!(
530 target: LOG_TARGET,
531 "Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
532 self.set_id,
533 );
534 self.drop_connection(peer_id);
535 } else {
536 trace!(
538 target: LOG_TARGET,
539 "Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
540 direction,
541 self.set_id,
542 );
543
544 match direction {
545 Direction::Inbound => self.num_in += 1,
546 Direction::Outbound => self.num_out += 1,
547 }
548
549 let prev = self.nodes.insert(peer_id, direction);
551 assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
552 }
553 } else {
554 trace!(
555 target: LOG_TARGET,
556 "Removed disconnected reserved node {peer_id} from {:?}.",
557 self.set_id,
558 );
559 }
560 }
561
562 fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
564 let current = self.reserved_nodes.keys().cloned().collect();
566 let to_insert = peer_ids.difference(¤t).cloned().collect::<Vec<_>>();
567 let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
568
569 for node in to_insert {
570 self.on_add_reserved_peer(node);
571 }
572
573 for node in to_remove {
574 self.on_remove_reserved_peer(node);
575 }
576 }
577
578 fn on_set_reserved_only(&mut self, reserved_only: bool) {
581 trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
582
583 self.reserved_only = reserved_only;
584
585 if !reserved_only {
586 return self.alloc_slots();
587 }
588
589 self.nodes
591 .iter()
592 .map(|(k, v)| (*k, *v))
593 .collect::<Vec<(_, _)>>()
594 .iter()
595 .for_each(|(peer_id, direction)| {
596 match direction {
598 Direction::Inbound => self.num_in -= 1,
599 Direction::Outbound => self.num_out -= 1,
600 }
601 self.drop_connection(*peer_id)
602 });
603 self.nodes.clear();
604 }
605
606 fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
608 let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
609 }
610
611 fn on_disconnect_peer(&mut self, peer_id: PeerId) {
613 if self.reserved_nodes.contains_key(&peer_id) {
615 debug!(
616 target: LOG_TARGET,
617 "Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
618 );
619 return;
620 }
621
622 match self.nodes.remove(&peer_id) {
623 Some(direction) => {
624 trace!(
625 target: LOG_TARGET,
626 "Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
627 self.set_id
628 );
629 match direction {
630 Direction::Inbound => self.num_in -= 1,
631 Direction::Outbound => self.num_out -= 1,
632 }
633 self.drop_connection(peer_id);
634 },
635 None => {
636 debug!(
637 target: LOG_TARGET,
638 "Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
639 );
640 },
641 }
642 }
643
644 fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
656 trace!(
657 target: LOG_TARGET,
658 "Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
659 self.set_id,
660 );
661
662 if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
663 self.reject_connection(peer_id, incoming_index);
664 return;
665 }
666
667 if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
669 match state {
670 PeerState::Connected(ref mut direction) => {
671 *direction = Direction::Inbound;
674 self.accept_connection(peer_id, incoming_index);
675 },
676 PeerState::NotConnected => {
677 if self.peer_store.is_banned(&peer_id.into()) {
678 self.reject_connection(peer_id, incoming_index);
679 } else {
680 *state = PeerState::Connected(Direction::Inbound);
681 self.accept_connection(peer_id, incoming_index);
682 }
683 },
684 }
685 return;
686 }
687
688 if let Some(direction) = self.nodes.remove(&peer_id) {
691 trace!(
692 target: LOG_TARGET,
693 "Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
694 peer_id,
695 direction,
696 self.set_id
697 );
698 match direction {
699 Direction::Inbound => self.num_in -= 1,
700 Direction::Outbound => self.num_out -= 1,
701 }
702 }
703
704 if self.num_in >= self.max_in {
705 self.reject_connection(peer_id, incoming_index);
706 return;
707 }
708
709 if self.is_banned(&peer_id) {
710 self.reject_connection(peer_id, incoming_index);
711 return;
712 }
713
714 self.num_in += 1;
715 self.nodes.insert(peer_id, Direction::Inbound);
716 self.accept_connection(peer_id, incoming_index);
717 }
718
719 fn on_peer_dropped(&mut self, peer_id: PeerId) {
721 self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
722 trace!(
726 target: LOG_TARGET,
727 "Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
728 self.set_id,
729 )
730 });
731 }
732
733 fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
736 if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
737 self.report_disconnect(peer_id);
739 Ok(())
740 } else {
741 Err(peer_id)
743 }
744 }
745
746 fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
750 let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
751
752 if let PeerState::Connected(direction) = state {
753 trace!(
754 target: LOG_TARGET,
755 "Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
756 self.set_id,
757 );
758 *state = PeerState::NotConnected;
759 Ok(true)
760 } else {
761 Err(*peer_id)
762 }
763 }
764
765 fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
768 let Some(direction) = self.nodes.remove(peer_id) else { return false };
769
770 trace!(
771 target: LOG_TARGET,
772 "Peer {peer_id} ({direction:?}) dropped from {:?}.",
773 self.set_id,
774 );
775
776 match direction {
777 Direction::Inbound => self.num_in -= 1,
778 Direction::Outbound => self.num_out -= 1,
779 }
780
781 true
782 }
783
784 fn alloc_slots(&mut self) {
787 self.reserved_nodes
789 .iter_mut()
790 .filter_map(|(peer_id, state)| {
791 (!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
792 *state = PeerState::Connected(Direction::Outbound);
793 peer_id
794 })
795 })
796 .cloned()
797 .collect::<Vec<_>>()
798 .into_iter()
799 .for_each(|peer_id| {
800 self.start_connection(peer_id);
801 });
802
803 if self.reserved_only || self.num_out >= self.max_out {
805 return;
806 }
807
808 let available_slots = (self.max_out - self.num_out).saturated_into();
810
811 let ignored = self
814 .reserved_nodes
815 .keys()
816 .map(From::from)
817 .collect::<HashSet<sc_network_types::PeerId>>()
818 .union(
819 &self.nodes.keys().map(From::from).collect::<HashSet<sc_network_types::PeerId>>(),
820 )
821 .cloned()
822 .collect();
823
824 let candidates = self
825 .peer_store
826 .outgoing_candidates(available_slots, ignored)
827 .into_iter()
828 .filter_map(|peer_id| {
829 (!self.reserved_nodes.contains_key(&peer_id.into()) &&
830 !self.nodes.contains_key(&peer_id.into()))
831 .then_some(peer_id)
832 .or_else(|| {
833 error!(
834 target: LOG_TARGET,
835 "`PeerStore` returned a node we asked to ignore: {peer_id}.",
836 );
837 debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
838 None
839 })
840 })
841 .collect::<Vec<_>>();
842
843 if candidates.len() > available_slots {
844 error!(
845 target: LOG_TARGET,
846 "`PeerStore` returned more nodes than there are slots available.",
847 );
848 debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
849 }
850
851 candidates.into_iter().take(available_slots).for_each(|peer_id| {
852 self.num_out += 1;
853 self.nodes.insert(peer_id.into(), Direction::Outbound);
854 self.start_connection(peer_id.into());
855 })
856 }
857}
858
859#[cfg(test)]
860mod tests {
861 use super::*;
862 use crate::{
863 peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
864 ReputationChange,
865 };
866 use libp2p::PeerId;
867 use sc_network_common::role::ObservedRole;
868 use sc_utils::mpsc::{tracing_unbounded, TryRecvError};
869 use std::collections::HashSet;
870
871 mockall::mock! {
872 #[derive(Debug)]
873 pub PeerStoreHandle {}
874
875 impl PeerStoreProvider for PeerStoreHandle {
876 fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool;
877 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
878 fn report_disconnect(&self, peer_id: sc_network_types::PeerId);
879 fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole);
880 fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange);
881 fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32;
882 fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole>;
883 fn outgoing_candidates(&self, count: usize, ignored: HashSet<sc_network_types::PeerId>) -> Vec<sc_network_types::PeerId>;
884 fn add_known_peer(&self, peer_id: sc_network_types::PeerId);
885 }
886 }
887
888 #[test]
889 fn reserved_nodes_are_connected_dropped_and_accepted() {
890 let reserved1 = PeerId::random();
891 let reserved2 = PeerId::random();
892
893 let config = ProtoSetConfig {
895 in_peers: 0,
896 out_peers: 0,
897 reserved_nodes: std::iter::once(reserved1).collect(),
898 reserved_only: true,
899 };
900 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
901
902 let mut peer_store = MockPeerStoreHandle::new();
903 peer_store.expect_register_protocol().once().return_const(());
904 peer_store.expect_is_banned().times(4).return_const(false);
905 peer_store.expect_report_disconnect().times(2).return_const(());
906
907 let (_handle, mut controller) =
908 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
909
910 controller.on_add_reserved_peer(reserved2);
912
913 controller.alloc_slots();
916
917 let mut messages = Vec::new();
918 while let Some(message) = rx.try_recv().ok() {
919 messages.push(message);
920 }
921 assert_eq!(messages.len(), 2);
922 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
923 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
924
925 assert_eq!(controller.num_out, 0);
927 assert_eq!(controller.num_in, 0);
928
929 controller.on_peer_dropped(reserved1);
931 controller.on_peer_dropped(reserved2);
932
933 let incoming1 = IncomingIndex(1);
935 controller.on_incoming_connection(reserved1, incoming1);
936 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
937 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
938
939 let incoming2 = IncomingIndex(2);
941 controller.on_incoming_connection(reserved2, incoming2);
942 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
943 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
944
945 assert_eq!(controller.num_out, 0);
947 assert_eq!(controller.num_in, 0);
948 }
949
950 #[test]
951 fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
952 let reserved1 = PeerId::random();
953 let reserved2 = PeerId::random();
954
955 let config = ProtoSetConfig {
957 in_peers: 0,
958 out_peers: 0,
959 reserved_nodes: std::iter::once(reserved1).collect(),
960 reserved_only: true,
961 };
962 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
963
964 let mut peer_store = MockPeerStoreHandle::new();
965 peer_store.expect_register_protocol().once().return_const(());
966 peer_store.expect_is_banned().times(6).return_const(true);
967
968 let (_handle, mut controller) =
969 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
970
971 controller.on_add_reserved_peer(reserved2);
973
974 controller.alloc_slots();
976
977 assert_eq!(controller.num_out, 0);
979 assert_eq!(controller.num_in, 0);
980
981 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
983
984 let incoming1 = IncomingIndex(1);
986 controller.on_incoming_connection(reserved1, incoming1);
987 assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
988 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
989
990 let incoming2 = IncomingIndex(2);
992 controller.on_incoming_connection(reserved2, incoming2);
993 assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
994 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
995
996 assert_eq!(controller.num_out, 0);
998 assert_eq!(controller.num_in, 0);
999 }
1000
1001 #[test]
1002 fn we_try_to_reconnect_to_dropped_reserved_nodes() {
1003 let reserved1 = PeerId::random();
1004 let reserved2 = PeerId::random();
1005
1006 let config = ProtoSetConfig {
1008 in_peers: 0,
1009 out_peers: 0,
1010 reserved_nodes: std::iter::once(reserved1).collect(),
1011 reserved_only: true,
1012 };
1013 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1014
1015 let mut peer_store = MockPeerStoreHandle::new();
1016 peer_store.expect_register_protocol().once().return_const(());
1017 peer_store.expect_is_banned().times(4).return_const(false);
1018 peer_store.expect_report_disconnect().times(2).return_const(());
1019
1020 let (_handle, mut controller) =
1021 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1022
1023 controller.on_add_reserved_peer(reserved2);
1025
1026 controller.alloc_slots();
1028
1029 let mut messages = Vec::new();
1030 while let Some(message) = rx.try_recv().ok() {
1031 messages.push(message);
1032 }
1033
1034 assert_eq!(messages.len(), 2);
1035 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1036 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1037
1038 controller.on_peer_dropped(reserved1);
1040 controller.on_peer_dropped(reserved2);
1041
1042 controller.alloc_slots();
1044
1045 let mut messages = Vec::new();
1046 while let Some(message) = rx.try_recv().ok() {
1047 messages.push(message);
1048 }
1049
1050 assert_eq!(messages.len(), 2);
1051 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1052 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1053
1054 assert_eq!(controller.num_out, 0);
1056 assert_eq!(controller.num_in, 0);
1057 }
1058
1059 #[test]
1060 fn nodes_supplied_by_peer_store_are_connected() {
1061 let peer1 = PeerId::random();
1062 let peer2 = PeerId::random();
1063 let candidates = vec![peer1.into(), peer2.into()];
1064
1065 let config = ProtoSetConfig {
1066 in_peers: 0,
1067 out_peers: 2,
1069 reserved_nodes: HashSet::new(),
1070 reserved_only: false,
1071 };
1072 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1073
1074 let mut peer_store = MockPeerStoreHandle::new();
1075 peer_store.expect_register_protocol().once().return_const(());
1076 peer_store.expect_outgoing_candidates().once().return_const(candidates);
1077
1078 let (_handle, mut controller) =
1079 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1080
1081 controller.alloc_slots();
1083
1084 let mut messages = Vec::new();
1085 while let Some(message) = rx.try_recv().ok() {
1086 messages.push(message);
1087 }
1088
1089 assert_eq!(messages.len(), 2);
1091 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1092 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1093
1094 assert_eq!(controller.num_out, 2);
1096 assert_eq!(controller.num_in, 0);
1097
1098 controller.alloc_slots();
1100 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1101
1102 assert_eq!(controller.num_out, 2);
1104 assert_eq!(controller.num_in, 0);
1105 }
1106
1107 #[test]
1108 fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1109 let reserved1 = PeerId::random();
1110 let reserved2 = PeerId::random();
1111 let regular1 = PeerId::random();
1112 let regular2 = PeerId::random();
1113 let outgoing_candidates = vec![regular1.into(), regular2.into()];
1114 let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1115
1116 let config =
1117 ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1118 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1119
1120 let mut peer_store = MockPeerStoreHandle::new();
1121 peer_store.expect_register_protocol().once().return_const(());
1122 peer_store.expect_is_banned().times(2).return_const(false);
1123 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1124
1125 let (_handle, mut controller) =
1126 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1127
1128 controller.alloc_slots();
1130
1131 let mut messages = Vec::new();
1132 while let Some(message) = rx.try_recv().ok() {
1133 messages.push(message);
1134 }
1135 assert_eq!(messages.len(), 4);
1136 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1137 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1138 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1139 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1140 assert_eq!(controller.num_out, 2);
1141 assert_eq!(controller.num_in, 0);
1142 }
1143
1144 #[test]
1145 fn if_slots_are_freed_we_try_to_allocate_them_again() {
1146 let peer1 = PeerId::random();
1147 let peer2 = PeerId::random();
1148 let peer3 = PeerId::random();
1149 let candidates1 = vec![peer1.into(), peer2.into()];
1150 let candidates2 = vec![peer3.into()];
1151
1152 let config = ProtoSetConfig {
1153 in_peers: 0,
1154 out_peers: 2,
1156 reserved_nodes: HashSet::new(),
1157 reserved_only: false,
1158 };
1159 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1160
1161 let mut peer_store = MockPeerStoreHandle::new();
1162 peer_store.expect_register_protocol().once().return_const(());
1163 peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1164 peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1165 peer_store.expect_report_disconnect().times(2).return_const(());
1166
1167 let (_handle, mut controller) =
1168 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1169
1170 controller.alloc_slots();
1172
1173 let mut messages = Vec::new();
1174 while let Some(message) = rx.try_recv().ok() {
1175 messages.push(message);
1176 }
1177
1178 assert_eq!(messages.len(), 2);
1180 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1181 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1182
1183 assert_eq!(controller.num_out, 2);
1185 assert_eq!(controller.num_in, 0);
1186
1187 controller.alloc_slots();
1189 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1190
1191 assert_eq!(controller.num_out, 2);
1193 assert_eq!(controller.num_in, 0);
1194
1195 controller.on_peer_dropped(peer1);
1197 controller.on_peer_dropped(peer2);
1198
1199 assert_eq!(controller.num_out, 0);
1201 assert_eq!(controller.num_in, 0);
1202
1203 controller.alloc_slots();
1205
1206 let mut messages = Vec::new();
1207 while let Some(message) = rx.try_recv().ok() {
1208 messages.push(message);
1209 }
1210
1211 assert_eq!(messages.len(), 1);
1213 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1214
1215 assert_eq!(controller.num_out, 1);
1217 assert_eq!(controller.num_in, 0);
1218 }
1219
1220 #[test]
1221 fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1222 let config = ProtoSetConfig {
1223 in_peers: 0,
1224 out_peers: 2,
1226 reserved_nodes: HashSet::new(),
1227 reserved_only: true,
1228 };
1229 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1230
1231 let mut peer_store = MockPeerStoreHandle::new();
1232 peer_store.expect_register_protocol().once().return_const(());
1233
1234 let (_handle, mut controller) =
1235 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1236
1237 controller.alloc_slots();
1239
1240 assert_eq!(controller.num_out, 0);
1242 assert_eq!(controller.num_in, 0);
1243 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1244 }
1245
1246 #[test]
1247 fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1248 let config = ProtoSetConfig {
1249 in_peers: 2,
1251 out_peers: 0,
1252 reserved_nodes: HashSet::new(),
1253 reserved_only: true,
1254 };
1255 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1256
1257 let mut peer_store = MockPeerStoreHandle::new();
1258 peer_store.expect_register_protocol().once().return_const(());
1259
1260 let (_handle, mut controller) =
1261 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1262
1263 let peer = PeerId::random();
1264 let incoming_index = IncomingIndex(1);
1265 controller.on_incoming_connection(peer, incoming_index);
1266
1267 let mut messages = Vec::new();
1268 while let Some(message) = rx.try_recv().ok() {
1269 messages.push(message);
1270 }
1271
1272 assert_eq!(messages.len(), 1);
1274 assert!(messages.contains(&Message::Reject(incoming_index)));
1275 assert_eq!(controller.num_out, 0);
1276 assert_eq!(controller.num_in, 0);
1277 }
1278
1279 #[test]
1280 fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1281 let peer1 = PeerId::random();
1282 let peer2 = PeerId::random();
1283 let candidates = vec![peer1.into(), peer2.into()];
1284
1285 let config = ProtoSetConfig {
1286 in_peers: 0,
1287 out_peers: 10,
1289 reserved_nodes: HashSet::new(),
1290 reserved_only: true,
1291 };
1292 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1293
1294 let mut peer_store = MockPeerStoreHandle::new();
1295 peer_store.expect_register_protocol().once().return_const(());
1296 peer_store.expect_outgoing_candidates().once().return_const(candidates);
1297
1298 let (_handle, mut controller) =
1299 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1300
1301 controller.alloc_slots();
1303
1304 assert_eq!(controller.num_out, 0);
1306 assert_eq!(controller.num_in, 0);
1307 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1308
1309 controller.on_set_reserved_only(false);
1311
1312 let mut messages = Vec::new();
1313 while let Some(message) = rx.try_recv().ok() {
1314 messages.push(message);
1315 }
1316
1317 assert_eq!(messages.len(), 2);
1318 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1319 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1320 assert_eq!(controller.num_out, 2);
1321 assert_eq!(controller.num_in, 0);
1322 }
1323
1324 #[test]
1325 fn enabling_reserved_only_mode_disconnects_regular_peers() {
1326 let reserved1 = PeerId::random();
1327 let reserved2 = PeerId::random();
1328 let regular1 = PeerId::random();
1329 let regular2 = PeerId::random();
1330 let outgoing_candidates = vec![regular1.into()];
1331
1332 let config = ProtoSetConfig {
1333 in_peers: 10,
1334 out_peers: 10,
1335 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1336 reserved_only: false,
1337 };
1338 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1339
1340 let mut peer_store = MockPeerStoreHandle::new();
1341 peer_store.expect_register_protocol().once().return_const(());
1342 peer_store.expect_is_banned().times(3).return_const(false);
1343 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1344
1345 let (_handle, mut controller) =
1346 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1347 assert_eq!(controller.num_out, 0);
1348 assert_eq!(controller.num_in, 0);
1349
1350 controller.alloc_slots();
1352
1353 let mut messages = Vec::new();
1354 while let Some(message) = rx.try_recv().ok() {
1355 messages.push(message);
1356 }
1357 assert_eq!(messages.len(), 3);
1358 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1359 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1360 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1361 assert_eq!(controller.num_out, 1);
1362 assert_eq!(controller.num_in, 0);
1363
1364 let incoming_index = IncomingIndex(1);
1366 controller.on_incoming_connection(regular2, incoming_index);
1367 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1368 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1369 assert_eq!(controller.num_out, 1);
1370 assert_eq!(controller.num_in, 1);
1371
1372 controller.on_set_reserved_only(true);
1374
1375 let mut messages = Vec::new();
1376 while let Some(message) = rx.try_recv().ok() {
1377 messages.push(message);
1378 }
1379 assert_eq!(messages.len(), 2);
1380 assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1381 assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1382 assert_eq!(controller.nodes.len(), 0);
1383 assert_eq!(controller.num_out, 0);
1384 assert_eq!(controller.num_in, 0);
1385 }
1386
1387 #[test]
1388 fn removed_disconnected_reserved_node_is_forgotten() {
1389 let reserved1 = PeerId::random();
1390 let reserved2 = PeerId::random();
1391
1392 let config = ProtoSetConfig {
1393 in_peers: 10,
1394 out_peers: 10,
1395 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1396 reserved_only: false,
1397 };
1398 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1399
1400 let mut peer_store = MockPeerStoreHandle::new();
1401 peer_store.expect_register_protocol().once().return_const(());
1402
1403 let (_handle, mut controller) =
1404 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1405 assert_eq!(controller.reserved_nodes.len(), 2);
1406 assert_eq!(controller.nodes.len(), 0);
1407 assert_eq!(controller.num_out, 0);
1408 assert_eq!(controller.num_in, 0);
1409
1410 controller.on_remove_reserved_peer(reserved1);
1411 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1412 assert_eq!(controller.reserved_nodes.len(), 1);
1413 assert!(!controller.reserved_nodes.contains_key(&reserved1));
1414 assert_eq!(controller.nodes.len(), 0);
1415 assert_eq!(controller.num_out, 0);
1416 assert_eq!(controller.num_in, 0);
1417 }
1418
1419 #[test]
1420 fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1421 let reserved1 = PeerId::random();
1422 let reserved2 = PeerId::random();
1423
1424 let config = ProtoSetConfig {
1425 in_peers: 10,
1426 out_peers: 10,
1427 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1428 reserved_only: true,
1429 };
1430 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1431
1432 let mut peer_store = MockPeerStoreHandle::new();
1433 peer_store.expect_register_protocol().once().return_const(());
1434 peer_store.expect_is_banned().times(2).return_const(false);
1435
1436 let (_handle, mut controller) =
1437 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1438
1439 controller.alloc_slots();
1441 let mut messages = Vec::new();
1442 while let Some(message) = rx.try_recv().ok() {
1443 messages.push(message);
1444 }
1445 assert_eq!(messages.len(), 2);
1446 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1447 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1448 assert_eq!(controller.reserved_nodes.len(), 2);
1449 assert!(controller.reserved_nodes.contains_key(&reserved1));
1450 assert!(controller.reserved_nodes.contains_key(&reserved2));
1451 assert!(controller.nodes.is_empty());
1452
1453 controller.on_remove_reserved_peer(reserved1);
1455 assert_eq!(
1456 rx.try_recv().unwrap(),
1457 Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1458 );
1459 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1460 assert_eq!(controller.reserved_nodes.len(), 1);
1461 assert!(controller.reserved_nodes.contains_key(&reserved2));
1462 assert!(controller.nodes.is_empty());
1463 }
1464
1465 #[test]
1466 fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1467 let peer1 = PeerId::random();
1468 let peer2 = PeerId::random();
1469
1470 let config = ProtoSetConfig {
1471 in_peers: 10,
1472 out_peers: 10,
1473 reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1474 reserved_only: false,
1475 };
1476 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1477
1478 let mut peer_store = MockPeerStoreHandle::new();
1479 peer_store.expect_register_protocol().once().return_const(());
1480 peer_store.expect_is_banned().times(2).return_const(false);
1481 peer_store
1482 .expect_outgoing_candidates()
1483 .once()
1484 .return_const(Vec::<sc_network_types::PeerId>::new());
1485
1486 let (_handle, mut controller) =
1487 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1488
1489 controller.on_incoming_connection(peer1, IncomingIndex(1));
1491 controller.alloc_slots();
1492 let mut messages = Vec::new();
1493 while let Some(message) = rx.try_recv().ok() {
1494 messages.push(message);
1495 }
1496 assert_eq!(messages.len(), 2);
1497 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1498 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1499 assert_eq!(controller.num_out, 0);
1500 assert_eq!(controller.num_in, 0);
1501
1502 controller.on_remove_reserved_peer(peer1);
1504 controller.on_remove_reserved_peer(peer2);
1505 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1506 assert_eq!(controller.nodes.len(), 2);
1507 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1508 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1509 assert_eq!(controller.num_out, 1);
1510 assert_eq!(controller.num_in, 1);
1511 }
1512
1513 #[test]
1514 fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1515 let peer1 = PeerId::random();
1516 let peer2 = PeerId::random();
1517 let outgoing_candidates = vec![peer1.into()];
1518
1519 let config = ProtoSetConfig {
1520 in_peers: 10,
1521 out_peers: 10,
1522 reserved_nodes: HashSet::new(),
1523 reserved_only: false,
1524 };
1525 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1526
1527 let mut peer_store = MockPeerStoreHandle::new();
1528 peer_store.expect_register_protocol().once().return_const(());
1529 peer_store.expect_is_banned().once().return_const(false);
1530 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1531
1532 let (_handle, mut controller) =
1533 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1534
1535 controller.alloc_slots();
1537 controller.on_incoming_connection(peer2, IncomingIndex(1));
1538 let mut messages = Vec::new();
1539 while let Some(message) = rx.try_recv().ok() {
1540 messages.push(message);
1541 }
1542 assert_eq!(messages.len(), 2);
1543 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1544 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1545 assert_eq!(controller.num_in, 1);
1546 assert_eq!(controller.num_out, 1);
1547
1548 controller.on_add_reserved_peer(peer1);
1549 controller.on_add_reserved_peer(peer2);
1550 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1551 assert_eq!(controller.num_in, 0);
1552 assert_eq!(controller.num_out, 0);
1553 }
1554
1555 #[test]
1556 fn disconnecting_regular_peers_work() {
1557 let peer1 = PeerId::random();
1558 let peer2 = PeerId::random();
1559 let outgoing_candidates = vec![peer1.into()];
1560
1561 let config = ProtoSetConfig {
1562 in_peers: 10,
1563 out_peers: 10,
1564 reserved_nodes: HashSet::new(),
1565 reserved_only: false,
1566 };
1567 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1568
1569 let mut peer_store = MockPeerStoreHandle::new();
1570 peer_store.expect_register_protocol().once().return_const(());
1571 peer_store.expect_is_banned().once().return_const(false);
1572 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1573
1574 let (_handle, mut controller) =
1575 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1576
1577 controller.alloc_slots();
1579 controller.on_incoming_connection(peer2, IncomingIndex(1));
1580 let mut messages = Vec::new();
1581 while let Some(message) = rx.try_recv().ok() {
1582 messages.push(message);
1583 }
1584 assert_eq!(messages.len(), 2);
1585 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1586 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1587 assert_eq!(controller.nodes.len(), 2);
1588 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1589 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1590 assert_eq!(controller.num_in, 1);
1591 assert_eq!(controller.num_out, 1);
1592
1593 controller.on_disconnect_peer(peer1);
1594 assert_eq!(
1595 rx.try_recv().unwrap(),
1596 Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1597 );
1598 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1599 assert_eq!(controller.nodes.len(), 1);
1600 assert!(!controller.nodes.contains_key(&peer1));
1601 assert_eq!(controller.num_in, 1);
1602 assert_eq!(controller.num_out, 0);
1603
1604 controller.on_disconnect_peer(peer2);
1605 assert_eq!(
1606 rx.try_recv().unwrap(),
1607 Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1608 );
1609 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1610 assert_eq!(controller.nodes.len(), 0);
1611 assert_eq!(controller.num_in, 0);
1612 assert_eq!(controller.num_out, 0);
1613 }
1614
1615 #[test]
1616 fn disconnecting_reserved_peers_is_a_noop() {
1617 let reserved1 = PeerId::random();
1618 let reserved2 = PeerId::random();
1619
1620 let config = ProtoSetConfig {
1621 in_peers: 10,
1622 out_peers: 10,
1623 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1624 reserved_only: false,
1625 };
1626 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1627
1628 let mut peer_store = MockPeerStoreHandle::new();
1629 peer_store.expect_register_protocol().once().return_const(());
1630 peer_store.expect_is_banned().times(2).return_const(false);
1631 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1632
1633 let (_handle, mut controller) =
1634 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1635
1636 controller.on_incoming_connection(reserved1, IncomingIndex(1));
1638 controller.alloc_slots();
1639 let mut messages = Vec::new();
1640 while let Some(message) = rx.try_recv().ok() {
1641 messages.push(message);
1642 }
1643 assert_eq!(messages.len(), 2);
1644 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1645 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1646 assert!(matches!(
1647 controller.reserved_nodes.get(&reserved1),
1648 Some(PeerState::Connected(Direction::Inbound))
1649 ));
1650 assert!(matches!(
1651 controller.reserved_nodes.get(&reserved2),
1652 Some(PeerState::Connected(Direction::Outbound))
1653 ));
1654
1655 controller.on_disconnect_peer(reserved1);
1656 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1657 assert!(matches!(
1658 controller.reserved_nodes.get(&reserved1),
1659 Some(PeerState::Connected(Direction::Inbound))
1660 ));
1661
1662 controller.on_disconnect_peer(reserved2);
1663 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1664 assert!(matches!(
1665 controller.reserved_nodes.get(&reserved2),
1666 Some(PeerState::Connected(Direction::Outbound))
1667 ));
1668 }
1669
1670 #[test]
1671 fn dropping_regular_peers_work() {
1672 let peer1 = PeerId::random();
1673 let peer2 = PeerId::random();
1674 let outgoing_candidates = vec![peer1.into()];
1675
1676 let config = ProtoSetConfig {
1677 in_peers: 10,
1678 out_peers: 10,
1679 reserved_nodes: HashSet::new(),
1680 reserved_only: false,
1681 };
1682 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1683
1684 let mut peer_store = MockPeerStoreHandle::new();
1685 peer_store.expect_register_protocol().once().return_const(());
1686 peer_store.expect_is_banned().once().return_const(false);
1687 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1688 peer_store.expect_report_disconnect().times(2).return_const(());
1689
1690 let (_handle, mut controller) =
1691 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1692
1693 controller.alloc_slots();
1695 controller.on_incoming_connection(peer2, IncomingIndex(1));
1696 let mut messages = Vec::new();
1697 while let Some(message) = rx.try_recv().ok() {
1698 messages.push(message);
1699 }
1700 assert_eq!(messages.len(), 2);
1701 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1702 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1703 assert_eq!(controller.nodes.len(), 2);
1704 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1705 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1706 assert_eq!(controller.num_in, 1);
1707 assert_eq!(controller.num_out, 1);
1708
1709 controller.on_peer_dropped(peer1);
1710 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1711 assert_eq!(controller.nodes.len(), 1);
1712 assert!(!controller.nodes.contains_key(&peer1));
1713 assert_eq!(controller.num_in, 1);
1714 assert_eq!(controller.num_out, 0);
1715
1716 controller.on_peer_dropped(peer2);
1717 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1718 assert_eq!(controller.nodes.len(), 0);
1719 assert_eq!(controller.num_in, 0);
1720 assert_eq!(controller.num_out, 0);
1721 }
1722
1723 #[test]
1724 fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1725 let reserved1 = PeerId::random();
1726 let reserved2 = PeerId::random();
1727
1728 let config = ProtoSetConfig {
1729 in_peers: 10,
1730 out_peers: 10,
1731 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1732 reserved_only: false,
1733 };
1734 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1735
1736 let mut peer_store = MockPeerStoreHandle::new();
1737 peer_store.expect_register_protocol().once().return_const(());
1738 peer_store.expect_is_banned().times(2).return_const(false);
1739 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1740
1741 let (_handle, mut controller) =
1742 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1743
1744 controller.on_incoming_connection(reserved1, IncomingIndex(1));
1746 controller.alloc_slots();
1747 let mut messages = Vec::new();
1748 while let Some(message) = rx.try_recv().ok() {
1749 messages.push(message);
1750 }
1751 assert_eq!(messages.len(), 2);
1752 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1753 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1754 assert!(matches!(
1755 controller.reserved_nodes.get(&reserved1),
1756 Some(PeerState::Connected(Direction::Inbound))
1757 ));
1758 assert!(matches!(
1759 controller.reserved_nodes.get(&reserved2),
1760 Some(PeerState::Connected(Direction::Outbound))
1761 ));
1762
1763 controller.on_incoming_connection(reserved1, IncomingIndex(2));
1765 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1766 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1767 assert!(matches!(
1768 controller.reserved_nodes.get(&reserved1),
1769 Some(PeerState::Connected(Direction::Inbound))
1770 ));
1771
1772 controller.on_incoming_connection(reserved2, IncomingIndex(3));
1774 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1775 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1776 assert!(matches!(
1777 controller.reserved_nodes.get(&reserved2),
1778 Some(PeerState::Connected(Direction::Inbound))
1779 ));
1780 }
1781
1782 #[test]
1783 fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1784 let regular1 = PeerId::random();
1785 let regular2 = PeerId::random();
1786 let outgoing_candidates = vec![regular1.into()];
1787
1788 let config = ProtoSetConfig {
1789 in_peers: 10,
1790 out_peers: 10,
1791 reserved_nodes: HashSet::new(),
1792 reserved_only: false,
1793 };
1794 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1795
1796 let mut peer_store = MockPeerStoreHandle::new();
1797 peer_store.expect_register_protocol().once().return_const(());
1798 peer_store.expect_is_banned().times(3).return_const(false);
1799 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1800
1801 let (_handle, mut controller) =
1802 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1803 assert_eq!(controller.num_out, 0);
1804 assert_eq!(controller.num_in, 0);
1805
1806 controller.alloc_slots();
1808 assert_eq!(
1809 rx.try_recv().ok().unwrap(),
1810 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1811 );
1812 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1813 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1814
1815 controller.on_incoming_connection(regular2, IncomingIndex(0));
1817 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1818 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1819 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1820
1821 controller.on_incoming_connection(regular1, IncomingIndex(1));
1823 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1824 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1825 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Inbound,));
1826
1827 controller.on_incoming_connection(regular2, IncomingIndex(2));
1829 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1830 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1831 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1832 }
1833
1834 #[test]
1835 fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1836 let regular1 = PeerId::random();
1837 let regular2 = PeerId::random();
1838 let outgoing_candidates = vec![regular1.into()];
1839
1840 let config = ProtoSetConfig {
1841 in_peers: 10,
1842 out_peers: 10,
1843 reserved_nodes: HashSet::new(),
1844 reserved_only: false,
1845 };
1846 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1847
1848 let mut peer_store = MockPeerStoreHandle::new();
1849 peer_store.expect_register_protocol().once().return_const(());
1850 peer_store.expect_is_banned().once().return_const(false);
1851 peer_store.expect_is_banned().times(2).return_const(true);
1852 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1853
1854 let (_handle, mut controller) =
1855 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1856 assert_eq!(controller.num_out, 0);
1857 assert_eq!(controller.num_in, 0);
1858
1859 controller.alloc_slots();
1861 assert_eq!(
1862 rx.try_recv().ok().unwrap(),
1863 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1864 );
1865 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1866 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1867
1868 controller.on_incoming_connection(regular2, IncomingIndex(0));
1870 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1871 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1872 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1873
1874 controller.on_incoming_connection(regular1, IncomingIndex(1));
1876 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1877 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1878 assert!(!controller.nodes.contains_key(®ular1));
1879
1880 controller.on_incoming_connection(regular2, IncomingIndex(2));
1882 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1883 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1884 assert!(!controller.nodes.contains_key(®ular2));
1885 }
1886
1887 #[test]
1888 fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1889 let regular1 = PeerId::random();
1890 let regular2 = PeerId::random();
1891 let outgoing_candidates = vec![regular1.into()];
1892
1893 let config = ProtoSetConfig {
1894 in_peers: 1,
1895 out_peers: 1,
1896 reserved_nodes: HashSet::new(),
1897 reserved_only: false,
1898 };
1899 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1900
1901 let mut peer_store = MockPeerStoreHandle::new();
1902 peer_store.expect_register_protocol().once().return_const(());
1903 peer_store.expect_is_banned().once().return_const(false);
1904 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1905
1906 let (_handle, mut controller) =
1907 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1908 assert_eq!(controller.num_out, 0);
1909 assert_eq!(controller.num_in, 0);
1910
1911 controller.alloc_slots();
1913 assert_eq!(
1914 rx.try_recv().ok().unwrap(),
1915 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1916 );
1917 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1918 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1919
1920 controller.on_incoming_connection(regular2, IncomingIndex(0));
1922 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1923 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1924 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1925
1926 controller.max_in = 0;
1927
1928 controller.on_incoming_connection(regular1, IncomingIndex(1));
1930 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1931 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1932 assert!(!controller.nodes.contains_key(®ular1));
1933
1934 controller.on_incoming_connection(regular2, IncomingIndex(2));
1936 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1937 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1938 assert!(!controller.nodes.contains_key(®ular2));
1939 }
1940
1941 #[test]
1942 fn incoming_peers_that_exceed_slots_are_rejected() {
1943 let peer1 = PeerId::random();
1944 let peer2 = PeerId::random();
1945
1946 let config = ProtoSetConfig {
1947 in_peers: 1,
1948 out_peers: 10,
1949 reserved_nodes: HashSet::new(),
1950 reserved_only: false,
1951 };
1952 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1953
1954 let mut peer_store = MockPeerStoreHandle::new();
1955 peer_store.expect_register_protocol().once().return_const(());
1956 peer_store.expect_is_banned().once().return_const(false);
1957
1958 let (_handle, mut controller) =
1959 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1960
1961 controller.on_incoming_connection(peer1, IncomingIndex(1));
1963 assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1964 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1965
1966 controller.on_incoming_connection(peer2, IncomingIndex(2));
1968 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1969 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1970 }
1971
1972 #[test]
1973 fn banned_regular_incoming_node_is_rejected() {
1974 let peer1 = PeerId::random();
1975
1976 let config = ProtoSetConfig {
1977 in_peers: 10,
1978 out_peers: 10,
1979 reserved_nodes: HashSet::new(),
1980 reserved_only: false,
1981 };
1982 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1983
1984 let mut peer_store = MockPeerStoreHandle::new();
1985 peer_store.expect_register_protocol().once().return_const(());
1986 peer_store.expect_is_banned().once().return_const(true);
1987
1988 let (_handle, mut controller) =
1989 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1990
1991 controller.on_incoming_connection(peer1, IncomingIndex(1));
1993 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1994 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1995 }
1996
1997 #[test]
1998 fn banned_reserved_incoming_node_is_rejected() {
1999 let reserved1 = PeerId::random();
2000
2001 let config = ProtoSetConfig {
2002 in_peers: 10,
2003 out_peers: 10,
2004 reserved_nodes: std::iter::once(reserved1).collect(),
2005 reserved_only: false,
2006 };
2007 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2008
2009 let mut peer_store = MockPeerStoreHandle::new();
2010 peer_store.expect_register_protocol().once().return_const(());
2011 peer_store.expect_is_banned().once().return_const(true);
2012
2013 let (_handle, mut controller) =
2014 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2015 assert!(controller.reserved_nodes.contains_key(&reserved1));
2016
2017 controller.on_incoming_connection(reserved1, IncomingIndex(1));
2019 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2020 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2021 }
2022
2023 #[test]
2024 fn we_dont_connect_to_banned_reserved_node() {
2025 let reserved1 = PeerId::random();
2026
2027 let config = ProtoSetConfig {
2028 in_peers: 10,
2029 out_peers: 10,
2030 reserved_nodes: std::iter::once(reserved1).collect(),
2031 reserved_only: false,
2032 };
2033 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2034
2035 let mut peer_store = MockPeerStoreHandle::new();
2036 peer_store.expect_register_protocol().once().return_const(());
2037 peer_store.expect_is_banned().once().return_const(true);
2038 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2039
2040 let (_handle, mut controller) =
2041 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2042 assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2043
2044 controller.alloc_slots();
2046 assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2047 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2048 }
2049}