1use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
33
34use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
35use libp2p::PeerId;
36use log::{debug, error, trace, warn};
37use soil_client::utils::mpsc::{
38 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
39};
40use std::{
41 collections::{HashMap, HashSet},
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use subsoil::arithmetic::traits::SaturatedConversion;
46use wasm_timer::Delay;
47
48pub const LOG_TARGET: &str = "peerset";
50
51#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
57pub struct SetId(usize);
58
59impl SetId {
60 pub const fn from(id: usize) -> Self {
62 Self(id)
63 }
64}
65
66impl From<usize> for SetId {
67 fn from(id: usize) -> Self {
68 Self(id)
69 }
70}
71
72impl From<SetId> for usize {
73 fn from(id: SetId) -> Self {
74 id.0
75 }
76}
77
78#[derive(Debug)]
80pub struct ProtoSetConfig {
81 pub in_peers: u32,
83
84 pub out_peers: u32,
86
87 pub reserved_nodes: HashSet<PeerId>,
92
93 pub reserved_only: bool,
95}
96
97#[derive(Debug, PartialEq)]
99pub enum Message {
100 Connect {
103 set_id: SetId,
105 peer_id: PeerId,
107 },
108
109 Drop {
111 set_id: SetId,
113 peer_id: PeerId,
115 },
116
117 Accept(IncomingIndex),
119
120 Reject(IncomingIndex),
122}
123
124#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
126pub struct IncomingIndex(pub u64);
127
128impl From<u64> for IncomingIndex {
129 fn from(val: u64) -> Self {
130 Self(val)
131 }
132}
133
134#[derive(Debug)]
136enum Action {
137 AddReservedPeer(PeerId),
139 RemoveReservedPeer(PeerId),
141 SetReservedPeers(HashSet<PeerId>),
143 SetReservedOnly(bool),
145 DisconnectPeer(PeerId),
147 GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
149}
150
151#[derive(Debug)]
153enum Event {
154 IncomingConnection(PeerId, IncomingIndex),
156 Dropped(PeerId),
158}
159
160#[derive(Debug, Clone)]
163pub struct ProtocolHandle {
164 actions_tx: TracingUnboundedSender<Action>,
166 events_tx: TracingUnboundedSender<Event>,
168}
169
170impl ProtocolHandle {
171 pub fn add_reserved_peer(&self, peer_id: PeerId) {
179 let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
180 }
181
182 pub fn remove_reserved_peer(&self, peer_id: PeerId) {
186 let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
187 }
188
189 pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
191 let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
192 }
193
194 pub fn set_reserved_only(&self, reserved: bool) {
197 let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
198 }
199
200 pub fn disconnect_peer(&self, peer_id: PeerId) {
203 let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
204 }
205
206 pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
208 let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
209 }
210
211 pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
213 let _ = self
214 .events_tx
215 .unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
216 }
217
218 pub fn dropped(&self, peer_id: PeerId) {
220 let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
221 }
222}
223
224impl ProtocolHandleT for ProtocolHandle {
225 fn disconnect_peer(&self, peer_id: crate::types::PeerId) {
226 let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
227 }
228}
229
230#[derive(Clone, Copy, Debug)]
232enum Direction {
233 Inbound,
234 Outbound,
235}
236
237#[derive(Clone, Debug)]
239enum PeerState {
240 Connected(Direction),
242 NotConnected,
244}
245
246impl PeerState {
247 fn is_connected(&self) -> bool {
249 matches!(self, PeerState::Connected(_))
250 }
251}
252
253impl Default for PeerState {
254 fn default() -> PeerState {
255 PeerState::NotConnected
256 }
257}
258
259#[derive(Debug)]
261pub struct ProtocolController {
262 set_id: SetId,
265 actions_rx: TracingUnboundedReceiver<Action>,
267 events_rx: TracingUnboundedReceiver<Event>,
269 num_in: u32,
271 num_out: u32,
273 max_in: u32,
275 max_out: u32,
277 nodes: HashMap<PeerId, Direction>,
279 reserved_nodes: HashMap<PeerId, PeerState>,
281 reserved_only: bool,
283 next_periodic_alloc_slots: Instant,
285 to_notifications: TracingUnboundedSender<Message>,
287 peer_store: Arc<dyn PeerStoreProvider>,
290}
291
292impl ProtocolController {
293 pub fn new(
295 set_id: SetId,
296 config: ProtoSetConfig,
297 to_notifications: TracingUnboundedSender<Message>,
298 peer_store: Arc<dyn PeerStoreProvider>,
299 ) -> (ProtocolHandle, ProtocolController) {
300 let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
301 let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
302 let handle = ProtocolHandle { actions_tx, events_tx };
303 peer_store.register_protocol(Arc::new(handle.clone()));
304 let reserved_nodes =
305 config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
306 let controller = ProtocolController {
307 set_id,
308 actions_rx,
309 events_rx,
310 num_in: 0,
311 num_out: 0,
312 max_in: config.in_peers,
313 max_out: config.out_peers,
314 nodes: HashMap::new(),
315 reserved_nodes,
316 reserved_only: config.reserved_only,
317 next_periodic_alloc_slots: Instant::now(),
318 to_notifications,
319 peer_store,
320 };
321 (handle, controller)
322 }
323
324 pub async fn run(mut self) {
327 while self.next_action().await {}
328 }
329
330 pub async fn next_action(&mut self) -> bool {
334 let either = loop {
335 let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
336
337 futures::select_biased! {
339 event = self.events_rx.next() => match event {
340 Some(event) => break Either::Left(event),
341 None => return false,
342 },
343 action = self.actions_rx.next() => match action {
344 Some(action) => break Either::Right(action),
345 None => return false,
346 },
347 _ = next_alloc_slots => {
348 self.alloc_slots();
349 self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
350 },
351 }
352 };
353
354 match either {
355 Either::Left(event) => self.process_event(event),
356 Either::Right(action) => self.process_action(action),
357 }
358
359 true
360 }
361
362 fn process_event(&mut self, event: Event) {
364 match event {
365 Event::IncomingConnection(peer_id, index) => {
366 self.on_incoming_connection(peer_id, index)
367 },
368 Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
369 }
370 }
371
372 fn process_action(&mut self, action: Action) {
374 match action {
375 Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
376 Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
377 Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
378 Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
379 Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
380 Action::GetReservedPeers(pending_response) => {
381 self.on_get_reserved_peers(pending_response)
382 },
383 }
384 }
385
386 fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
388 trace!(
389 target: LOG_TARGET,
390 "Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
391 self.set_id,
392 self.num_in,
393 self.max_in,
394 );
395
396 let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
397 }
398
399 fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
401 trace!(
402 target: LOG_TARGET,
403 "Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
404 self.set_id,
405 self.num_in,
406 self.max_in,
407 );
408
409 let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
410 }
411
412 fn start_connection(&mut self, peer_id: PeerId) {
414 trace!(
415 target: LOG_TARGET,
416 "Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
417 self.set_id,
418 self.num_out,
419 self.max_out,
420 );
421
422 let _ = self
423 .to_notifications
424 .unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
425 }
426
427 fn drop_connection(&mut self, peer_id: PeerId) {
429 trace!(
430 target: LOG_TARGET,
431 "Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
432 self.set_id,
433 self.num_in,
434 self.max_in,
435 self.num_out,
436 self.max_out,
437 );
438
439 let _ = self
440 .to_notifications
441 .unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
442 }
443
444 fn report_disconnect(&mut self, peer_id: PeerId) {
447 self.peer_store.report_disconnect(peer_id.into());
448 }
449
450 fn is_banned(&self, peer_id: &PeerId) -> bool {
452 self.peer_store.is_banned(&peer_id.into())
453 }
454
455 fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
458 if self.reserved_nodes.contains_key(&peer_id) {
459 debug!(
460 target: LOG_TARGET,
461 "Trying to add an already reserved node {peer_id} as reserved on {:?}.",
462 self.set_id,
463 );
464 return;
465 }
466
467 let state = match self.nodes.remove(&peer_id) {
469 Some(direction) => {
470 trace!(
471 target: LOG_TARGET,
472 "Marking previously connected node {} ({:?}) as reserved on {:?}.",
473 peer_id,
474 direction,
475 self.set_id
476 );
477 PeerState::Connected(direction)
478 },
479 None => {
480 trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
481 PeerState::NotConnected
482 },
483 };
484
485 self.reserved_nodes.insert(peer_id, state.clone());
486
487 match state {
489 PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
490 PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
491 PeerState::NotConnected => self.alloc_slots(),
492 }
493 }
494
495 fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
498 let state = match self.reserved_nodes.remove(&peer_id) {
499 Some(state) => state,
500 None => {
501 warn!(
502 target: LOG_TARGET,
503 "Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
504 );
505 return;
506 },
507 };
508
509 if let PeerState::Connected(direction) = state {
510 let disconnect = self.reserved_only
512 || match direction {
513 Direction::Inbound => self.num_in >= self.max_in,
514 Direction::Outbound => self.num_out >= self.max_out,
515 };
516
517 if disconnect {
518 trace!(
520 target: LOG_TARGET,
521 "Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
522 self.set_id,
523 );
524 self.drop_connection(peer_id);
525 } else {
526 trace!(
528 target: LOG_TARGET,
529 "Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
530 direction,
531 self.set_id,
532 );
533
534 match direction {
535 Direction::Inbound => self.num_in += 1,
536 Direction::Outbound => self.num_out += 1,
537 }
538
539 let prev = self.nodes.insert(peer_id, direction);
541 assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
542 }
543 } else {
544 trace!(
545 target: LOG_TARGET,
546 "Removed disconnected reserved node {peer_id} from {:?}.",
547 self.set_id,
548 );
549 }
550 }
551
552 fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
554 let current = self.reserved_nodes.keys().cloned().collect();
556 let to_insert = peer_ids.difference(¤t).cloned().collect::<Vec<_>>();
557 let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
558
559 for node in to_insert {
560 self.on_add_reserved_peer(node);
561 }
562
563 for node in to_remove {
564 self.on_remove_reserved_peer(node);
565 }
566 }
567
568 fn on_set_reserved_only(&mut self, reserved_only: bool) {
571 trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
572
573 self.reserved_only = reserved_only;
574
575 if !reserved_only {
576 return self.alloc_slots();
577 }
578
579 self.nodes
581 .iter()
582 .map(|(k, v)| (*k, *v))
583 .collect::<Vec<(_, _)>>()
584 .iter()
585 .for_each(|(peer_id, direction)| {
586 match direction {
588 Direction::Inbound => self.num_in -= 1,
589 Direction::Outbound => self.num_out -= 1,
590 }
591 self.drop_connection(*peer_id)
592 });
593 self.nodes.clear();
594 }
595
596 fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
598 let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
599 }
600
601 fn on_disconnect_peer(&mut self, peer_id: PeerId) {
603 if self.reserved_nodes.contains_key(&peer_id) {
605 debug!(
606 target: LOG_TARGET,
607 "Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
608 );
609 return;
610 }
611
612 match self.nodes.remove(&peer_id) {
613 Some(direction) => {
614 trace!(
615 target: LOG_TARGET,
616 "Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
617 self.set_id
618 );
619 match direction {
620 Direction::Inbound => self.num_in -= 1,
621 Direction::Outbound => self.num_out -= 1,
622 }
623 self.drop_connection(peer_id);
624 },
625 None => {
626 debug!(
627 target: LOG_TARGET,
628 "Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
629 );
630 },
631 }
632 }
633
634 fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
646 trace!(
647 target: LOG_TARGET,
648 "Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
649 self.set_id,
650 );
651
652 if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
653 self.reject_connection(peer_id, incoming_index);
654 return;
655 }
656
657 if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
659 match state {
660 PeerState::Connected(ref mut direction) => {
661 *direction = Direction::Inbound;
664 self.accept_connection(peer_id, incoming_index);
665 },
666 PeerState::NotConnected => {
667 if self.peer_store.is_banned(&peer_id.into()) {
668 self.reject_connection(peer_id, incoming_index);
669 } else {
670 *state = PeerState::Connected(Direction::Inbound);
671 self.accept_connection(peer_id, incoming_index);
672 }
673 },
674 }
675 return;
676 }
677
678 if let Some(direction) = self.nodes.remove(&peer_id) {
681 trace!(
682 target: LOG_TARGET,
683 "Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
684 peer_id,
685 direction,
686 self.set_id
687 );
688 match direction {
689 Direction::Inbound => self.num_in -= 1,
690 Direction::Outbound => self.num_out -= 1,
691 }
692 }
693
694 if self.num_in >= self.max_in {
695 self.reject_connection(peer_id, incoming_index);
696 return;
697 }
698
699 if self.is_banned(&peer_id) {
700 self.reject_connection(peer_id, incoming_index);
701 return;
702 }
703
704 self.num_in += 1;
705 self.nodes.insert(peer_id, Direction::Inbound);
706 self.accept_connection(peer_id, incoming_index);
707 }
708
709 fn on_peer_dropped(&mut self, peer_id: PeerId) {
711 self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
712 trace!(
716 target: LOG_TARGET,
717 "Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
718 self.set_id,
719 )
720 });
721 }
722
723 fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
726 if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
727 self.report_disconnect(peer_id);
729 Ok(())
730 } else {
731 Err(peer_id)
733 }
734 }
735
736 fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
740 let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
741
742 if let PeerState::Connected(direction) = state {
743 trace!(
744 target: LOG_TARGET,
745 "Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
746 self.set_id,
747 );
748 *state = PeerState::NotConnected;
749 Ok(true)
750 } else {
751 Err(*peer_id)
752 }
753 }
754
755 fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
758 let Some(direction) = self.nodes.remove(peer_id) else { return false };
759
760 trace!(
761 target: LOG_TARGET,
762 "Peer {peer_id} ({direction:?}) dropped from {:?}.",
763 self.set_id,
764 );
765
766 match direction {
767 Direction::Inbound => self.num_in -= 1,
768 Direction::Outbound => self.num_out -= 1,
769 }
770
771 true
772 }
773
774 fn alloc_slots(&mut self) {
777 self.reserved_nodes
779 .iter_mut()
780 .filter_map(|(peer_id, state)| {
781 (!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
782 *state = PeerState::Connected(Direction::Outbound);
783 peer_id
784 })
785 })
786 .cloned()
787 .collect::<Vec<_>>()
788 .into_iter()
789 .for_each(|peer_id| {
790 self.start_connection(peer_id);
791 });
792
793 if self.reserved_only || self.num_out >= self.max_out {
795 return;
796 }
797
798 let available_slots = (self.max_out - self.num_out).saturated_into();
800
801 let ignored = self
804 .reserved_nodes
805 .keys()
806 .map(From::from)
807 .collect::<HashSet<crate::types::PeerId>>()
808 .union(&self.nodes.keys().map(From::from).collect::<HashSet<crate::types::PeerId>>())
809 .cloned()
810 .collect();
811
812 let candidates = self
813 .peer_store
814 .outgoing_candidates(available_slots, ignored)
815 .into_iter()
816 .filter_map(|peer_id| {
817 (!self.reserved_nodes.contains_key(&peer_id.into())
818 && !self.nodes.contains_key(&peer_id.into()))
819 .then_some(peer_id)
820 .or_else(|| {
821 error!(
822 target: LOG_TARGET,
823 "`PeerStore` returned a node we asked to ignore: {peer_id}.",
824 );
825 debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
826 None
827 })
828 })
829 .collect::<Vec<_>>();
830
831 if candidates.len() > available_slots {
832 error!(
833 target: LOG_TARGET,
834 "`PeerStore` returned more nodes than there are slots available.",
835 );
836 debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
837 }
838
839 candidates.into_iter().take(available_slots).for_each(|peer_id| {
840 self.num_out += 1;
841 self.nodes.insert(peer_id.into(), Direction::Outbound);
842 self.start_connection(peer_id.into());
843 })
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850 use crate::common::role::ObservedRole;
851 use crate::{
852 peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
853 ReputationChange,
854 };
855 use libp2p::PeerId;
856 use soil_client::utils::mpsc::{tracing_unbounded, TryRecvError};
857 use std::collections::HashSet;
858
859 mockall::mock! {
860 #[derive(Debug)]
861 pub PeerStoreHandle {}
862
863 impl PeerStoreProvider for PeerStoreHandle {
864 fn is_banned(&self, peer_id: &crate::types::PeerId) -> bool;
865 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
866 fn report_disconnect(&self, peer_id: crate::types::PeerId);
867 fn set_peer_role(&self, peer_id: &crate::types::PeerId, role: ObservedRole);
868 fn report_peer(&self, peer_id: crate::types::PeerId, change: ReputationChange);
869 fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32;
870 fn peer_role(&self, peer_id: &crate::types::PeerId) -> Option<ObservedRole>;
871 fn outgoing_candidates(&self, count: usize, ignored: HashSet<crate::types::PeerId>) -> Vec<crate::types::PeerId>;
872 fn add_known_peer(&self, peer_id: crate::types::PeerId);
873 }
874 }
875
876 #[test]
877 fn reserved_nodes_are_connected_dropped_and_accepted() {
878 let reserved1 = PeerId::random();
879 let reserved2 = PeerId::random();
880
881 let config = ProtoSetConfig {
883 in_peers: 0,
884 out_peers: 0,
885 reserved_nodes: std::iter::once(reserved1).collect(),
886 reserved_only: true,
887 };
888 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
889
890 let mut peer_store = MockPeerStoreHandle::new();
891 peer_store.expect_register_protocol().once().return_const(());
892 peer_store.expect_is_banned().times(4).return_const(false);
893 peer_store.expect_report_disconnect().times(2).return_const(());
894
895 let (_handle, mut controller) =
896 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
897
898 controller.on_add_reserved_peer(reserved2);
900
901 controller.alloc_slots();
904
905 let mut messages = Vec::new();
906 while let Some(message) = rx.try_recv().ok() {
907 messages.push(message);
908 }
909 assert_eq!(messages.len(), 2);
910 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
911 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
912
913 assert_eq!(controller.num_out, 0);
915 assert_eq!(controller.num_in, 0);
916
917 controller.on_peer_dropped(reserved1);
919 controller.on_peer_dropped(reserved2);
920
921 let incoming1 = IncomingIndex(1);
923 controller.on_incoming_connection(reserved1, incoming1);
924 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
925 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
926
927 let incoming2 = IncomingIndex(2);
929 controller.on_incoming_connection(reserved2, incoming2);
930 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
931 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
932
933 assert_eq!(controller.num_out, 0);
935 assert_eq!(controller.num_in, 0);
936 }
937
938 #[test]
939 fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
940 let reserved1 = PeerId::random();
941 let reserved2 = PeerId::random();
942
943 let config = ProtoSetConfig {
945 in_peers: 0,
946 out_peers: 0,
947 reserved_nodes: std::iter::once(reserved1).collect(),
948 reserved_only: true,
949 };
950 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
951
952 let mut peer_store = MockPeerStoreHandle::new();
953 peer_store.expect_register_protocol().once().return_const(());
954 peer_store.expect_is_banned().times(6).return_const(true);
955
956 let (_handle, mut controller) =
957 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
958
959 controller.on_add_reserved_peer(reserved2);
961
962 controller.alloc_slots();
964
965 assert_eq!(controller.num_out, 0);
967 assert_eq!(controller.num_in, 0);
968
969 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
971
972 let incoming1 = IncomingIndex(1);
974 controller.on_incoming_connection(reserved1, incoming1);
975 assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
976 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
977
978 let incoming2 = IncomingIndex(2);
980 controller.on_incoming_connection(reserved2, incoming2);
981 assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
982 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
983
984 assert_eq!(controller.num_out, 0);
986 assert_eq!(controller.num_in, 0);
987 }
988
989 #[test]
990 fn we_try_to_reconnect_to_dropped_reserved_nodes() {
991 let reserved1 = PeerId::random();
992 let reserved2 = PeerId::random();
993
994 let config = ProtoSetConfig {
996 in_peers: 0,
997 out_peers: 0,
998 reserved_nodes: std::iter::once(reserved1).collect(),
999 reserved_only: true,
1000 };
1001 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1002
1003 let mut peer_store = MockPeerStoreHandle::new();
1004 peer_store.expect_register_protocol().once().return_const(());
1005 peer_store.expect_is_banned().times(4).return_const(false);
1006 peer_store.expect_report_disconnect().times(2).return_const(());
1007
1008 let (_handle, mut controller) =
1009 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1010
1011 controller.on_add_reserved_peer(reserved2);
1013
1014 controller.alloc_slots();
1016
1017 let mut messages = Vec::new();
1018 while let Some(message) = rx.try_recv().ok() {
1019 messages.push(message);
1020 }
1021
1022 assert_eq!(messages.len(), 2);
1023 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1024 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1025
1026 controller.on_peer_dropped(reserved1);
1028 controller.on_peer_dropped(reserved2);
1029
1030 controller.alloc_slots();
1032
1033 let mut messages = Vec::new();
1034 while let Some(message) = rx.try_recv().ok() {
1035 messages.push(message);
1036 }
1037
1038 assert_eq!(messages.len(), 2);
1039 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1040 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1041
1042 assert_eq!(controller.num_out, 0);
1044 assert_eq!(controller.num_in, 0);
1045 }
1046
1047 #[test]
1048 fn nodes_supplied_by_peer_store_are_connected() {
1049 let peer1 = PeerId::random();
1050 let peer2 = PeerId::random();
1051 let candidates = vec![peer1.into(), peer2.into()];
1052
1053 let config = ProtoSetConfig {
1054 in_peers: 0,
1055 out_peers: 2,
1057 reserved_nodes: HashSet::new(),
1058 reserved_only: false,
1059 };
1060 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1061
1062 let mut peer_store = MockPeerStoreHandle::new();
1063 peer_store.expect_register_protocol().once().return_const(());
1064 peer_store.expect_outgoing_candidates().once().return_const(candidates);
1065
1066 let (_handle, mut controller) =
1067 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1068
1069 controller.alloc_slots();
1071
1072 let mut messages = Vec::new();
1073 while let Some(message) = rx.try_recv().ok() {
1074 messages.push(message);
1075 }
1076
1077 assert_eq!(messages.len(), 2);
1079 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1080 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1081
1082 assert_eq!(controller.num_out, 2);
1084 assert_eq!(controller.num_in, 0);
1085
1086 controller.alloc_slots();
1088 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1089
1090 assert_eq!(controller.num_out, 2);
1092 assert_eq!(controller.num_in, 0);
1093 }
1094
1095 #[test]
1096 fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1097 let reserved1 = PeerId::random();
1098 let reserved2 = PeerId::random();
1099 let regular1 = PeerId::random();
1100 let regular2 = PeerId::random();
1101 let outgoing_candidates = vec![regular1.into(), regular2.into()];
1102 let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1103
1104 let config =
1105 ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1106 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1107
1108 let mut peer_store = MockPeerStoreHandle::new();
1109 peer_store.expect_register_protocol().once().return_const(());
1110 peer_store.expect_is_banned().times(2).return_const(false);
1111 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1112
1113 let (_handle, mut controller) =
1114 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1115
1116 controller.alloc_slots();
1118
1119 let mut messages = Vec::new();
1120 while let Some(message) = rx.try_recv().ok() {
1121 messages.push(message);
1122 }
1123 assert_eq!(messages.len(), 4);
1124 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1125 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1126 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1127 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1128 assert_eq!(controller.num_out, 2);
1129 assert_eq!(controller.num_in, 0);
1130 }
1131
1132 #[test]
1133 fn if_slots_are_freed_we_try_to_allocate_them_again() {
1134 let peer1 = PeerId::random();
1135 let peer2 = PeerId::random();
1136 let peer3 = PeerId::random();
1137 let candidates1 = vec![peer1.into(), peer2.into()];
1138 let candidates2 = vec![peer3.into()];
1139
1140 let config = ProtoSetConfig {
1141 in_peers: 0,
1142 out_peers: 2,
1144 reserved_nodes: HashSet::new(),
1145 reserved_only: false,
1146 };
1147 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1148
1149 let mut peer_store = MockPeerStoreHandle::new();
1150 peer_store.expect_register_protocol().once().return_const(());
1151 peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1152 peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1153 peer_store.expect_report_disconnect().times(2).return_const(());
1154
1155 let (_handle, mut controller) =
1156 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1157
1158 controller.alloc_slots();
1160
1161 let mut messages = Vec::new();
1162 while let Some(message) = rx.try_recv().ok() {
1163 messages.push(message);
1164 }
1165
1166 assert_eq!(messages.len(), 2);
1168 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1169 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1170
1171 assert_eq!(controller.num_out, 2);
1173 assert_eq!(controller.num_in, 0);
1174
1175 controller.alloc_slots();
1177 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1178
1179 assert_eq!(controller.num_out, 2);
1181 assert_eq!(controller.num_in, 0);
1182
1183 controller.on_peer_dropped(peer1);
1185 controller.on_peer_dropped(peer2);
1186
1187 assert_eq!(controller.num_out, 0);
1189 assert_eq!(controller.num_in, 0);
1190
1191 controller.alloc_slots();
1193
1194 let mut messages = Vec::new();
1195 while let Some(message) = rx.try_recv().ok() {
1196 messages.push(message);
1197 }
1198
1199 assert_eq!(messages.len(), 1);
1201 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1202
1203 assert_eq!(controller.num_out, 1);
1205 assert_eq!(controller.num_in, 0);
1206 }
1207
1208 #[test]
1209 fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1210 let config = ProtoSetConfig {
1211 in_peers: 0,
1212 out_peers: 2,
1214 reserved_nodes: HashSet::new(),
1215 reserved_only: true,
1216 };
1217 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1218
1219 let mut peer_store = MockPeerStoreHandle::new();
1220 peer_store.expect_register_protocol().once().return_const(());
1221
1222 let (_handle, mut controller) =
1223 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1224
1225 controller.alloc_slots();
1227
1228 assert_eq!(controller.num_out, 0);
1230 assert_eq!(controller.num_in, 0);
1231 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1232 }
1233
1234 #[test]
1235 fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1236 let config = ProtoSetConfig {
1237 in_peers: 2,
1239 out_peers: 0,
1240 reserved_nodes: HashSet::new(),
1241 reserved_only: true,
1242 };
1243 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1244
1245 let mut peer_store = MockPeerStoreHandle::new();
1246 peer_store.expect_register_protocol().once().return_const(());
1247
1248 let (_handle, mut controller) =
1249 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1250
1251 let peer = PeerId::random();
1252 let incoming_index = IncomingIndex(1);
1253 controller.on_incoming_connection(peer, incoming_index);
1254
1255 let mut messages = Vec::new();
1256 while let Some(message) = rx.try_recv().ok() {
1257 messages.push(message);
1258 }
1259
1260 assert_eq!(messages.len(), 1);
1262 assert!(messages.contains(&Message::Reject(incoming_index)));
1263 assert_eq!(controller.num_out, 0);
1264 assert_eq!(controller.num_in, 0);
1265 }
1266
1267 #[test]
1268 fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1269 let peer1 = PeerId::random();
1270 let peer2 = PeerId::random();
1271 let candidates = vec![peer1.into(), peer2.into()];
1272
1273 let config = ProtoSetConfig {
1274 in_peers: 0,
1275 out_peers: 10,
1277 reserved_nodes: HashSet::new(),
1278 reserved_only: true,
1279 };
1280 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1281
1282 let mut peer_store = MockPeerStoreHandle::new();
1283 peer_store.expect_register_protocol().once().return_const(());
1284 peer_store.expect_outgoing_candidates().once().return_const(candidates);
1285
1286 let (_handle, mut controller) =
1287 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1288
1289 controller.alloc_slots();
1291
1292 assert_eq!(controller.num_out, 0);
1294 assert_eq!(controller.num_in, 0);
1295 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1296
1297 controller.on_set_reserved_only(false);
1299
1300 let mut messages = Vec::new();
1301 while let Some(message) = rx.try_recv().ok() {
1302 messages.push(message);
1303 }
1304
1305 assert_eq!(messages.len(), 2);
1306 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1307 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1308 assert_eq!(controller.num_out, 2);
1309 assert_eq!(controller.num_in, 0);
1310 }
1311
1312 #[test]
1313 fn enabling_reserved_only_mode_disconnects_regular_peers() {
1314 let reserved1 = PeerId::random();
1315 let reserved2 = PeerId::random();
1316 let regular1 = PeerId::random();
1317 let regular2 = PeerId::random();
1318 let outgoing_candidates = vec![regular1.into()];
1319
1320 let config = ProtoSetConfig {
1321 in_peers: 10,
1322 out_peers: 10,
1323 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1324 reserved_only: false,
1325 };
1326 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1327
1328 let mut peer_store = MockPeerStoreHandle::new();
1329 peer_store.expect_register_protocol().once().return_const(());
1330 peer_store.expect_is_banned().times(3).return_const(false);
1331 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1332
1333 let (_handle, mut controller) =
1334 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1335 assert_eq!(controller.num_out, 0);
1336 assert_eq!(controller.num_in, 0);
1337
1338 controller.alloc_slots();
1340
1341 let mut messages = Vec::new();
1342 while let Some(message) = rx.try_recv().ok() {
1343 messages.push(message);
1344 }
1345 assert_eq!(messages.len(), 3);
1346 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1347 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1348 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1349 assert_eq!(controller.num_out, 1);
1350 assert_eq!(controller.num_in, 0);
1351
1352 let incoming_index = IncomingIndex(1);
1354 controller.on_incoming_connection(regular2, incoming_index);
1355 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1356 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1357 assert_eq!(controller.num_out, 1);
1358 assert_eq!(controller.num_in, 1);
1359
1360 controller.on_set_reserved_only(true);
1362
1363 let mut messages = Vec::new();
1364 while let Some(message) = rx.try_recv().ok() {
1365 messages.push(message);
1366 }
1367 assert_eq!(messages.len(), 2);
1368 assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1369 assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1370 assert_eq!(controller.nodes.len(), 0);
1371 assert_eq!(controller.num_out, 0);
1372 assert_eq!(controller.num_in, 0);
1373 }
1374
1375 #[test]
1376 fn removed_disconnected_reserved_node_is_forgotten() {
1377 let reserved1 = PeerId::random();
1378 let reserved2 = PeerId::random();
1379
1380 let config = ProtoSetConfig {
1381 in_peers: 10,
1382 out_peers: 10,
1383 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1384 reserved_only: false,
1385 };
1386 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1387
1388 let mut peer_store = MockPeerStoreHandle::new();
1389 peer_store.expect_register_protocol().once().return_const(());
1390
1391 let (_handle, mut controller) =
1392 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1393 assert_eq!(controller.reserved_nodes.len(), 2);
1394 assert_eq!(controller.nodes.len(), 0);
1395 assert_eq!(controller.num_out, 0);
1396 assert_eq!(controller.num_in, 0);
1397
1398 controller.on_remove_reserved_peer(reserved1);
1399 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1400 assert_eq!(controller.reserved_nodes.len(), 1);
1401 assert!(!controller.reserved_nodes.contains_key(&reserved1));
1402 assert_eq!(controller.nodes.len(), 0);
1403 assert_eq!(controller.num_out, 0);
1404 assert_eq!(controller.num_in, 0);
1405 }
1406
1407 #[test]
1408 fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1409 let reserved1 = PeerId::random();
1410 let reserved2 = PeerId::random();
1411
1412 let config = ProtoSetConfig {
1413 in_peers: 10,
1414 out_peers: 10,
1415 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1416 reserved_only: true,
1417 };
1418 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1419
1420 let mut peer_store = MockPeerStoreHandle::new();
1421 peer_store.expect_register_protocol().once().return_const(());
1422 peer_store.expect_is_banned().times(2).return_const(false);
1423
1424 let (_handle, mut controller) =
1425 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1426
1427 controller.alloc_slots();
1429 let mut messages = Vec::new();
1430 while let Some(message) = rx.try_recv().ok() {
1431 messages.push(message);
1432 }
1433 assert_eq!(messages.len(), 2);
1434 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1435 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1436 assert_eq!(controller.reserved_nodes.len(), 2);
1437 assert!(controller.reserved_nodes.contains_key(&reserved1));
1438 assert!(controller.reserved_nodes.contains_key(&reserved2));
1439 assert!(controller.nodes.is_empty());
1440
1441 controller.on_remove_reserved_peer(reserved1);
1443 assert_eq!(
1444 rx.try_recv().unwrap(),
1445 Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1446 );
1447 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1448 assert_eq!(controller.reserved_nodes.len(), 1);
1449 assert!(controller.reserved_nodes.contains_key(&reserved2));
1450 assert!(controller.nodes.is_empty());
1451 }
1452
1453 #[test]
1454 fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1455 let peer1 = PeerId::random();
1456 let peer2 = PeerId::random();
1457
1458 let config = ProtoSetConfig {
1459 in_peers: 10,
1460 out_peers: 10,
1461 reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1462 reserved_only: false,
1463 };
1464 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1465
1466 let mut peer_store = MockPeerStoreHandle::new();
1467 peer_store.expect_register_protocol().once().return_const(());
1468 peer_store.expect_is_banned().times(2).return_const(false);
1469 peer_store
1470 .expect_outgoing_candidates()
1471 .once()
1472 .return_const(Vec::<crate::types::PeerId>::new());
1473
1474 let (_handle, mut controller) =
1475 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1476
1477 controller.on_incoming_connection(peer1, IncomingIndex(1));
1479 controller.alloc_slots();
1480 let mut messages = Vec::new();
1481 while let Some(message) = rx.try_recv().ok() {
1482 messages.push(message);
1483 }
1484 assert_eq!(messages.len(), 2);
1485 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1486 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1487 assert_eq!(controller.num_out, 0);
1488 assert_eq!(controller.num_in, 0);
1489
1490 controller.on_remove_reserved_peer(peer1);
1492 controller.on_remove_reserved_peer(peer2);
1493 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1494 assert_eq!(controller.nodes.len(), 2);
1495 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1496 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1497 assert_eq!(controller.num_out, 1);
1498 assert_eq!(controller.num_in, 1);
1499 }
1500
1501 #[test]
1502 fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1503 let peer1 = PeerId::random();
1504 let peer2 = PeerId::random();
1505 let outgoing_candidates = vec![peer1.into()];
1506
1507 let config = ProtoSetConfig {
1508 in_peers: 10,
1509 out_peers: 10,
1510 reserved_nodes: HashSet::new(),
1511 reserved_only: false,
1512 };
1513 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1514
1515 let mut peer_store = MockPeerStoreHandle::new();
1516 peer_store.expect_register_protocol().once().return_const(());
1517 peer_store.expect_is_banned().once().return_const(false);
1518 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1519
1520 let (_handle, mut controller) =
1521 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1522
1523 controller.alloc_slots();
1525 controller.on_incoming_connection(peer2, IncomingIndex(1));
1526 let mut messages = Vec::new();
1527 while let Some(message) = rx.try_recv().ok() {
1528 messages.push(message);
1529 }
1530 assert_eq!(messages.len(), 2);
1531 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1532 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1533 assert_eq!(controller.num_in, 1);
1534 assert_eq!(controller.num_out, 1);
1535
1536 controller.on_add_reserved_peer(peer1);
1537 controller.on_add_reserved_peer(peer2);
1538 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1539 assert_eq!(controller.num_in, 0);
1540 assert_eq!(controller.num_out, 0);
1541 }
1542
1543 #[test]
1544 fn disconnecting_regular_peers_work() {
1545 let peer1 = PeerId::random();
1546 let peer2 = PeerId::random();
1547 let outgoing_candidates = vec![peer1.into()];
1548
1549 let config = ProtoSetConfig {
1550 in_peers: 10,
1551 out_peers: 10,
1552 reserved_nodes: HashSet::new(),
1553 reserved_only: false,
1554 };
1555 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1556
1557 let mut peer_store = MockPeerStoreHandle::new();
1558 peer_store.expect_register_protocol().once().return_const(());
1559 peer_store.expect_is_banned().once().return_const(false);
1560 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1561
1562 let (_handle, mut controller) =
1563 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1564
1565 controller.alloc_slots();
1567 controller.on_incoming_connection(peer2, IncomingIndex(1));
1568 let mut messages = Vec::new();
1569 while let Some(message) = rx.try_recv().ok() {
1570 messages.push(message);
1571 }
1572 assert_eq!(messages.len(), 2);
1573 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1574 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1575 assert_eq!(controller.nodes.len(), 2);
1576 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1577 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1578 assert_eq!(controller.num_in, 1);
1579 assert_eq!(controller.num_out, 1);
1580
1581 controller.on_disconnect_peer(peer1);
1582 assert_eq!(
1583 rx.try_recv().unwrap(),
1584 Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1585 );
1586 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1587 assert_eq!(controller.nodes.len(), 1);
1588 assert!(!controller.nodes.contains_key(&peer1));
1589 assert_eq!(controller.num_in, 1);
1590 assert_eq!(controller.num_out, 0);
1591
1592 controller.on_disconnect_peer(peer2);
1593 assert_eq!(
1594 rx.try_recv().unwrap(),
1595 Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1596 );
1597 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1598 assert_eq!(controller.nodes.len(), 0);
1599 assert_eq!(controller.num_in, 0);
1600 assert_eq!(controller.num_out, 0);
1601 }
1602
1603 #[test]
1604 fn disconnecting_reserved_peers_is_a_noop() {
1605 let reserved1 = PeerId::random();
1606 let reserved2 = PeerId::random();
1607
1608 let config = ProtoSetConfig {
1609 in_peers: 10,
1610 out_peers: 10,
1611 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1612 reserved_only: false,
1613 };
1614 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1615
1616 let mut peer_store = MockPeerStoreHandle::new();
1617 peer_store.expect_register_protocol().once().return_const(());
1618 peer_store.expect_is_banned().times(2).return_const(false);
1619 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1620
1621 let (_handle, mut controller) =
1622 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1623
1624 controller.on_incoming_connection(reserved1, IncomingIndex(1));
1626 controller.alloc_slots();
1627 let mut messages = Vec::new();
1628 while let Some(message) = rx.try_recv().ok() {
1629 messages.push(message);
1630 }
1631 assert_eq!(messages.len(), 2);
1632 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1633 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1634 assert!(matches!(
1635 controller.reserved_nodes.get(&reserved1),
1636 Some(PeerState::Connected(Direction::Inbound))
1637 ));
1638 assert!(matches!(
1639 controller.reserved_nodes.get(&reserved2),
1640 Some(PeerState::Connected(Direction::Outbound))
1641 ));
1642
1643 controller.on_disconnect_peer(reserved1);
1644 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1645 assert!(matches!(
1646 controller.reserved_nodes.get(&reserved1),
1647 Some(PeerState::Connected(Direction::Inbound))
1648 ));
1649
1650 controller.on_disconnect_peer(reserved2);
1651 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1652 assert!(matches!(
1653 controller.reserved_nodes.get(&reserved2),
1654 Some(PeerState::Connected(Direction::Outbound))
1655 ));
1656 }
1657
1658 #[test]
1659 fn dropping_regular_peers_work() {
1660 let peer1 = PeerId::random();
1661 let peer2 = PeerId::random();
1662 let outgoing_candidates = vec![peer1.into()];
1663
1664 let config = ProtoSetConfig {
1665 in_peers: 10,
1666 out_peers: 10,
1667 reserved_nodes: HashSet::new(),
1668 reserved_only: false,
1669 };
1670 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1671
1672 let mut peer_store = MockPeerStoreHandle::new();
1673 peer_store.expect_register_protocol().once().return_const(());
1674 peer_store.expect_is_banned().once().return_const(false);
1675 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1676 peer_store.expect_report_disconnect().times(2).return_const(());
1677
1678 let (_handle, mut controller) =
1679 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1680
1681 controller.alloc_slots();
1683 controller.on_incoming_connection(peer2, IncomingIndex(1));
1684 let mut messages = Vec::new();
1685 while let Some(message) = rx.try_recv().ok() {
1686 messages.push(message);
1687 }
1688 assert_eq!(messages.len(), 2);
1689 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1690 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1691 assert_eq!(controller.nodes.len(), 2);
1692 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1693 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1694 assert_eq!(controller.num_in, 1);
1695 assert_eq!(controller.num_out, 1);
1696
1697 controller.on_peer_dropped(peer1);
1698 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1699 assert_eq!(controller.nodes.len(), 1);
1700 assert!(!controller.nodes.contains_key(&peer1));
1701 assert_eq!(controller.num_in, 1);
1702 assert_eq!(controller.num_out, 0);
1703
1704 controller.on_peer_dropped(peer2);
1705 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1706 assert_eq!(controller.nodes.len(), 0);
1707 assert_eq!(controller.num_in, 0);
1708 assert_eq!(controller.num_out, 0);
1709 }
1710
1711 #[test]
1712 fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1713 let reserved1 = PeerId::random();
1714 let reserved2 = PeerId::random();
1715
1716 let config = ProtoSetConfig {
1717 in_peers: 10,
1718 out_peers: 10,
1719 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1720 reserved_only: false,
1721 };
1722 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1723
1724 let mut peer_store = MockPeerStoreHandle::new();
1725 peer_store.expect_register_protocol().once().return_const(());
1726 peer_store.expect_is_banned().times(2).return_const(false);
1727 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1728
1729 let (_handle, mut controller) =
1730 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1731
1732 controller.on_incoming_connection(reserved1, IncomingIndex(1));
1734 controller.alloc_slots();
1735 let mut messages = Vec::new();
1736 while let Some(message) = rx.try_recv().ok() {
1737 messages.push(message);
1738 }
1739 assert_eq!(messages.len(), 2);
1740 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1741 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1742 assert!(matches!(
1743 controller.reserved_nodes.get(&reserved1),
1744 Some(PeerState::Connected(Direction::Inbound))
1745 ));
1746 assert!(matches!(
1747 controller.reserved_nodes.get(&reserved2),
1748 Some(PeerState::Connected(Direction::Outbound))
1749 ));
1750
1751 controller.on_incoming_connection(reserved1, IncomingIndex(2));
1753 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1754 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1755 assert!(matches!(
1756 controller.reserved_nodes.get(&reserved1),
1757 Some(PeerState::Connected(Direction::Inbound))
1758 ));
1759
1760 controller.on_incoming_connection(reserved2, IncomingIndex(3));
1762 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1763 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1764 assert!(matches!(
1765 controller.reserved_nodes.get(&reserved2),
1766 Some(PeerState::Connected(Direction::Inbound))
1767 ));
1768 }
1769
1770 #[test]
1771 fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1772 let regular1 = PeerId::random();
1773 let regular2 = PeerId::random();
1774 let outgoing_candidates = vec![regular1.into()];
1775
1776 let config = ProtoSetConfig {
1777 in_peers: 10,
1778 out_peers: 10,
1779 reserved_nodes: HashSet::new(),
1780 reserved_only: false,
1781 };
1782 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1783
1784 let mut peer_store = MockPeerStoreHandle::new();
1785 peer_store.expect_register_protocol().once().return_const(());
1786 peer_store.expect_is_banned().times(3).return_const(false);
1787 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1788
1789 let (_handle, mut controller) =
1790 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1791 assert_eq!(controller.num_out, 0);
1792 assert_eq!(controller.num_in, 0);
1793
1794 controller.alloc_slots();
1796 assert_eq!(
1797 rx.try_recv().ok().unwrap(),
1798 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1799 );
1800 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1801 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1802
1803 controller.on_incoming_connection(regular2, IncomingIndex(0));
1805 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1806 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1807 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1808
1809 controller.on_incoming_connection(regular1, IncomingIndex(1));
1811 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1812 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1813 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Inbound,));
1814
1815 controller.on_incoming_connection(regular2, IncomingIndex(2));
1817 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1818 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1819 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1820 }
1821
1822 #[test]
1823 fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1824 let regular1 = PeerId::random();
1825 let regular2 = PeerId::random();
1826 let outgoing_candidates = vec![regular1.into()];
1827
1828 let config = ProtoSetConfig {
1829 in_peers: 10,
1830 out_peers: 10,
1831 reserved_nodes: HashSet::new(),
1832 reserved_only: false,
1833 };
1834 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1835
1836 let mut peer_store = MockPeerStoreHandle::new();
1837 peer_store.expect_register_protocol().once().return_const(());
1838 peer_store.expect_is_banned().once().return_const(false);
1839 peer_store.expect_is_banned().times(2).return_const(true);
1840 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1841
1842 let (_handle, mut controller) =
1843 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1844 assert_eq!(controller.num_out, 0);
1845 assert_eq!(controller.num_in, 0);
1846
1847 controller.alloc_slots();
1849 assert_eq!(
1850 rx.try_recv().ok().unwrap(),
1851 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1852 );
1853 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1854 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1855
1856 controller.on_incoming_connection(regular2, IncomingIndex(0));
1858 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1859 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1860 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1861
1862 controller.on_incoming_connection(regular1, IncomingIndex(1));
1864 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1865 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1866 assert!(!controller.nodes.contains_key(®ular1));
1867
1868 controller.on_incoming_connection(regular2, IncomingIndex(2));
1870 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1871 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1872 assert!(!controller.nodes.contains_key(®ular2));
1873 }
1874
1875 #[test]
1876 fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1877 let regular1 = PeerId::random();
1878 let regular2 = PeerId::random();
1879 let outgoing_candidates = vec![regular1.into()];
1880
1881 let config = ProtoSetConfig {
1882 in_peers: 1,
1883 out_peers: 1,
1884 reserved_nodes: HashSet::new(),
1885 reserved_only: false,
1886 };
1887 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1888
1889 let mut peer_store = MockPeerStoreHandle::new();
1890 peer_store.expect_register_protocol().once().return_const(());
1891 peer_store.expect_is_banned().once().return_const(false);
1892 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1893
1894 let (_handle, mut controller) =
1895 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1896 assert_eq!(controller.num_out, 0);
1897 assert_eq!(controller.num_in, 0);
1898
1899 controller.alloc_slots();
1901 assert_eq!(
1902 rx.try_recv().ok().unwrap(),
1903 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1904 );
1905 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1906 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1907
1908 controller.on_incoming_connection(regular2, IncomingIndex(0));
1910 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1911 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1912 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1913
1914 controller.max_in = 0;
1915
1916 controller.on_incoming_connection(regular1, IncomingIndex(1));
1918 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1919 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1920 assert!(!controller.nodes.contains_key(®ular1));
1921
1922 controller.on_incoming_connection(regular2, IncomingIndex(2));
1924 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1925 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1926 assert!(!controller.nodes.contains_key(®ular2));
1927 }
1928
1929 #[test]
1930 fn incoming_peers_that_exceed_slots_are_rejected() {
1931 let peer1 = PeerId::random();
1932 let peer2 = PeerId::random();
1933
1934 let config = ProtoSetConfig {
1935 in_peers: 1,
1936 out_peers: 10,
1937 reserved_nodes: HashSet::new(),
1938 reserved_only: false,
1939 };
1940 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1941
1942 let mut peer_store = MockPeerStoreHandle::new();
1943 peer_store.expect_register_protocol().once().return_const(());
1944 peer_store.expect_is_banned().once().return_const(false);
1945
1946 let (_handle, mut controller) =
1947 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1948
1949 controller.on_incoming_connection(peer1, IncomingIndex(1));
1951 assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1952 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1953
1954 controller.on_incoming_connection(peer2, IncomingIndex(2));
1956 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1957 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1958 }
1959
1960 #[test]
1961 fn banned_regular_incoming_node_is_rejected() {
1962 let peer1 = PeerId::random();
1963
1964 let config = ProtoSetConfig {
1965 in_peers: 10,
1966 out_peers: 10,
1967 reserved_nodes: HashSet::new(),
1968 reserved_only: false,
1969 };
1970 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1971
1972 let mut peer_store = MockPeerStoreHandle::new();
1973 peer_store.expect_register_protocol().once().return_const(());
1974 peer_store.expect_is_banned().once().return_const(true);
1975
1976 let (_handle, mut controller) =
1977 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1978
1979 controller.on_incoming_connection(peer1, IncomingIndex(1));
1981 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1982 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1983 }
1984
1985 #[test]
1986 fn banned_reserved_incoming_node_is_rejected() {
1987 let reserved1 = PeerId::random();
1988
1989 let config = ProtoSetConfig {
1990 in_peers: 10,
1991 out_peers: 10,
1992 reserved_nodes: std::iter::once(reserved1).collect(),
1993 reserved_only: false,
1994 };
1995 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1996
1997 let mut peer_store = MockPeerStoreHandle::new();
1998 peer_store.expect_register_protocol().once().return_const(());
1999 peer_store.expect_is_banned().once().return_const(true);
2000
2001 let (_handle, mut controller) =
2002 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2003 assert!(controller.reserved_nodes.contains_key(&reserved1));
2004
2005 controller.on_incoming_connection(reserved1, IncomingIndex(1));
2007 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2008 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2009 }
2010
2011 #[test]
2012 fn we_dont_connect_to_banned_reserved_node() {
2013 let reserved1 = PeerId::random();
2014
2015 let config = ProtoSetConfig {
2016 in_peers: 10,
2017 out_peers: 10,
2018 reserved_nodes: std::iter::once(reserved1).collect(),
2019 reserved_only: false,
2020 };
2021 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2022
2023 let mut peer_store = MockPeerStoreHandle::new();
2024 peer_store.expect_register_protocol().once().return_const(());
2025 peer_store.expect_is_banned().once().return_const(true);
2026 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2027
2028 let (_handle, mut controller) =
2029 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2030 assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2031
2032 controller.alloc_slots();
2034 assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2035 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2036 }
2037}