1use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
45
46use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
47use libp2p::PeerId;
48use log::{debug, error, trace, warn};
49use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
50use pezsp_arithmetic::traits::SaturatedConversion;
51use std::{
52 collections::{HashMap, HashSet},
53 sync::Arc,
54 time::{Duration, Instant},
55};
56use wasm_timer::Delay;
57
58pub const LOG_TARGET: &str = "peerset";
60
61#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct SetId(usize);
68
69impl SetId {
70 pub const fn from(id: usize) -> Self {
72 Self(id)
73 }
74}
75
76impl From<usize> for SetId {
77 fn from(id: usize) -> Self {
78 Self(id)
79 }
80}
81
82impl From<SetId> for usize {
83 fn from(id: SetId) -> Self {
84 id.0
85 }
86}
87
88#[derive(Debug)]
90pub struct ProtoSetConfig {
91 pub in_peers: u32,
93
94 pub out_peers: u32,
96
97 pub reserved_nodes: HashSet<PeerId>,
102
103 pub reserved_only: bool,
105}
106
107#[derive(Debug, PartialEq)]
109pub enum Message {
110 Connect {
113 set_id: SetId,
115 peer_id: PeerId,
117 },
118
119 Drop {
121 set_id: SetId,
123 peer_id: PeerId,
125 },
126
127 Accept(IncomingIndex),
129
130 Reject(IncomingIndex),
132}
133
134#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub struct IncomingIndex(pub u64);
137
138impl From<u64> for IncomingIndex {
139 fn from(val: u64) -> Self {
140 Self(val)
141 }
142}
143
144#[derive(Debug)]
146enum Action {
147 AddReservedPeer(PeerId),
149 RemoveReservedPeer(PeerId),
151 SetReservedPeers(HashSet<PeerId>),
153 SetReservedOnly(bool),
155 DisconnectPeer(PeerId),
157 GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
159}
160
161#[derive(Debug)]
163enum Event {
164 IncomingConnection(PeerId, IncomingIndex),
166 Dropped(PeerId),
168}
169
170#[derive(Debug, Clone)]
173pub struct ProtocolHandle {
174 actions_tx: TracingUnboundedSender<Action>,
176 events_tx: TracingUnboundedSender<Event>,
178}
179
180impl ProtocolHandle {
181 pub fn add_reserved_peer(&self, peer_id: PeerId) {
189 let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
190 }
191
192 pub fn remove_reserved_peer(&self, peer_id: PeerId) {
196 let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
197 }
198
199 pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
201 let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
202 }
203
204 pub fn set_reserved_only(&self, reserved: bool) {
207 let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
208 }
209
210 pub fn disconnect_peer(&self, peer_id: PeerId) {
213 let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
214 }
215
216 pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
218 let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
219 }
220
221 pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
223 let _ = self
224 .events_tx
225 .unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
226 }
227
228 pub fn dropped(&self, peer_id: PeerId) {
230 let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
231 }
232}
233
234impl ProtocolHandleT for ProtocolHandle {
235 fn disconnect_peer(&self, peer_id: pezsc_network_types::PeerId) {
236 let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
237 }
238}
239
240#[derive(Clone, Copy, Debug)]
242enum Direction {
243 Inbound,
244 Outbound,
245}
246
247#[derive(Clone, Debug)]
249enum PeerState {
250 Connected(Direction),
252 NotConnected,
254}
255
256impl PeerState {
257 fn is_connected(&self) -> bool {
259 matches!(self, PeerState::Connected(_))
260 }
261}
262
263impl Default for PeerState {
264 fn default() -> PeerState {
265 PeerState::NotConnected
266 }
267}
268
269#[derive(Debug)]
271pub struct ProtocolController {
272 set_id: SetId,
275 actions_rx: TracingUnboundedReceiver<Action>,
277 events_rx: TracingUnboundedReceiver<Event>,
279 num_in: u32,
281 num_out: u32,
283 max_in: u32,
285 max_out: u32,
287 nodes: HashMap<PeerId, Direction>,
289 reserved_nodes: HashMap<PeerId, PeerState>,
291 reserved_only: bool,
293 next_periodic_alloc_slots: Instant,
295 to_notifications: TracingUnboundedSender<Message>,
297 peer_store: Arc<dyn PeerStoreProvider>,
300}
301
302impl ProtocolController {
303 pub fn new(
305 set_id: SetId,
306 config: ProtoSetConfig,
307 to_notifications: TracingUnboundedSender<Message>,
308 peer_store: Arc<dyn PeerStoreProvider>,
309 ) -> (ProtocolHandle, ProtocolController) {
310 let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
311 let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
312 let handle = ProtocolHandle { actions_tx, events_tx };
313 peer_store.register_protocol(Arc::new(handle.clone()));
314 let reserved_nodes =
315 config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
316 let controller = ProtocolController {
317 set_id,
318 actions_rx,
319 events_rx,
320 num_in: 0,
321 num_out: 0,
322 max_in: config.in_peers,
323 max_out: config.out_peers,
324 nodes: HashMap::new(),
325 reserved_nodes,
326 reserved_only: config.reserved_only,
327 next_periodic_alloc_slots: Instant::now(),
328 to_notifications,
329 peer_store,
330 };
331 (handle, controller)
332 }
333
334 pub async fn run(mut self) {
337 while self.next_action().await {}
338 }
339
340 pub async fn next_action(&mut self) -> bool {
344 let either = loop {
345 let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
346
347 futures::select_biased! {
349 event = self.events_rx.next() => match event {
350 Some(event) => break Either::Left(event),
351 None => return false,
352 },
353 action = self.actions_rx.next() => match action {
354 Some(action) => break Either::Right(action),
355 None => return false,
356 },
357 _ = next_alloc_slots => {
358 self.alloc_slots();
359 self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
360 },
361 }
362 };
363
364 match either {
365 Either::Left(event) => self.process_event(event),
366 Either::Right(action) => self.process_action(action),
367 }
368
369 true
370 }
371
372 fn process_event(&mut self, event: Event) {
374 match event {
375 Event::IncomingConnection(peer_id, index) => {
376 self.on_incoming_connection(peer_id, index)
377 },
378 Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
379 }
380 }
381
382 fn process_action(&mut self, action: Action) {
384 match action {
385 Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
386 Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
387 Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
388 Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
389 Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
390 Action::GetReservedPeers(pending_response) => {
391 self.on_get_reserved_peers(pending_response)
392 },
393 }
394 }
395
396 fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
398 trace!(
399 target: LOG_TARGET,
400 "Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
401 self.set_id,
402 self.num_in,
403 self.max_in,
404 );
405
406 let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
407 }
408
409 fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
411 trace!(
412 target: LOG_TARGET,
413 "Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
414 self.set_id,
415 self.num_in,
416 self.max_in,
417 );
418
419 let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
420 }
421
422 fn start_connection(&mut self, peer_id: PeerId) {
424 trace!(
425 target: LOG_TARGET,
426 "Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
427 self.set_id,
428 self.num_out,
429 self.max_out,
430 );
431
432 let _ = self
433 .to_notifications
434 .unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
435 }
436
437 fn drop_connection(&mut self, peer_id: PeerId) {
439 trace!(
440 target: LOG_TARGET,
441 "Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
442 self.set_id,
443 self.num_in,
444 self.max_in,
445 self.num_out,
446 self.max_out,
447 );
448
449 let _ = self
450 .to_notifications
451 .unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
452 }
453
454 fn report_disconnect(&mut self, peer_id: PeerId) {
457 self.peer_store.report_disconnect(peer_id.into());
458 }
459
460 fn is_banned(&self, peer_id: &PeerId) -> bool {
462 self.peer_store.is_banned(&peer_id.into())
463 }
464
465 fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
468 if self.reserved_nodes.contains_key(&peer_id) {
469 debug!(
470 target: LOG_TARGET,
471 "Trying to add an already reserved node {peer_id} as reserved on {:?}.",
472 self.set_id,
473 );
474 return;
475 }
476
477 let state = match self.nodes.remove(&peer_id) {
479 Some(direction) => {
480 trace!(
481 target: LOG_TARGET,
482 "Marking previously connected node {} ({:?}) as reserved on {:?}.",
483 peer_id,
484 direction,
485 self.set_id
486 );
487 PeerState::Connected(direction)
488 },
489 None => {
490 trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
491 PeerState::NotConnected
492 },
493 };
494
495 self.reserved_nodes.insert(peer_id, state.clone());
496
497 match state {
499 PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
500 PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
501 PeerState::NotConnected => self.alloc_slots(),
502 }
503 }
504
505 fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
508 let state = match self.reserved_nodes.remove(&peer_id) {
509 Some(state) => state,
510 None => {
511 warn!(
512 target: LOG_TARGET,
513 "Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
514 );
515 return;
516 },
517 };
518
519 if let PeerState::Connected(direction) = state {
520 let disconnect = self.reserved_only
522 || match direction {
523 Direction::Inbound => self.num_in >= self.max_in,
524 Direction::Outbound => self.num_out >= self.max_out,
525 };
526
527 if disconnect {
528 trace!(
530 target: LOG_TARGET,
531 "Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
532 self.set_id,
533 );
534 self.drop_connection(peer_id);
535 } else {
536 trace!(
538 target: LOG_TARGET,
539 "Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
540 direction,
541 self.set_id,
542 );
543
544 match direction {
545 Direction::Inbound => self.num_in += 1,
546 Direction::Outbound => self.num_out += 1,
547 }
548
549 let prev = self.nodes.insert(peer_id, direction);
551 assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
552 }
553 } else {
554 trace!(
555 target: LOG_TARGET,
556 "Removed disconnected reserved node {peer_id} from {:?}.",
557 self.set_id,
558 );
559 }
560 }
561
562 fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
564 let current = self.reserved_nodes.keys().cloned().collect();
566 let to_insert = peer_ids.difference(¤t).cloned().collect::<Vec<_>>();
567 let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
568
569 for node in to_insert {
570 self.on_add_reserved_peer(node);
571 }
572
573 for node in to_remove {
574 self.on_remove_reserved_peer(node);
575 }
576 }
577
578 fn on_set_reserved_only(&mut self, reserved_only: bool) {
581 trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
582
583 self.reserved_only = reserved_only;
584
585 if !reserved_only {
586 return self.alloc_slots();
587 }
588
589 self.nodes
591 .iter()
592 .map(|(k, v)| (*k, *v))
593 .collect::<Vec<(_, _)>>()
594 .iter()
595 .for_each(|(peer_id, direction)| {
596 match direction {
598 Direction::Inbound => self.num_in -= 1,
599 Direction::Outbound => self.num_out -= 1,
600 }
601 self.drop_connection(*peer_id)
602 });
603 self.nodes.clear();
604 }
605
606 fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
608 let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
609 }
610
611 fn on_disconnect_peer(&mut self, peer_id: PeerId) {
613 if self.reserved_nodes.contains_key(&peer_id) {
615 debug!(
616 target: LOG_TARGET,
617 "Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
618 );
619 return;
620 }
621
622 match self.nodes.remove(&peer_id) {
623 Some(direction) => {
624 trace!(
625 target: LOG_TARGET,
626 "Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
627 self.set_id
628 );
629 match direction {
630 Direction::Inbound => self.num_in -= 1,
631 Direction::Outbound => self.num_out -= 1,
632 }
633 self.drop_connection(peer_id);
634 },
635 None => {
636 debug!(
637 target: LOG_TARGET,
638 "Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
639 );
640 },
641 }
642 }
643
644 fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
656 trace!(
657 target: LOG_TARGET,
658 "Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
659 self.set_id,
660 );
661
662 if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
663 self.reject_connection(peer_id, incoming_index);
664 return;
665 }
666
667 if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
669 match state {
670 PeerState::Connected(ref mut direction) => {
671 *direction = Direction::Inbound;
674 self.accept_connection(peer_id, incoming_index);
675 },
676 PeerState::NotConnected => {
677 if self.peer_store.is_banned(&peer_id.into()) {
678 self.reject_connection(peer_id, incoming_index);
679 } else {
680 *state = PeerState::Connected(Direction::Inbound);
681 self.accept_connection(peer_id, incoming_index);
682 }
683 },
684 }
685 return;
686 }
687
688 if let Some(direction) = self.nodes.remove(&peer_id) {
691 trace!(
692 target: LOG_TARGET,
693 "Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
694 peer_id,
695 direction,
696 self.set_id
697 );
698 match direction {
699 Direction::Inbound => self.num_in -= 1,
700 Direction::Outbound => self.num_out -= 1,
701 }
702 }
703
704 if self.num_in >= self.max_in {
705 self.reject_connection(peer_id, incoming_index);
706 return;
707 }
708
709 if self.is_banned(&peer_id) {
710 self.reject_connection(peer_id, incoming_index);
711 return;
712 }
713
714 self.num_in += 1;
715 self.nodes.insert(peer_id, Direction::Inbound);
716 self.accept_connection(peer_id, incoming_index);
717 }
718
719 fn on_peer_dropped(&mut self, peer_id: PeerId) {
721 self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
722 trace!(
726 target: LOG_TARGET,
727 "Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
728 self.set_id,
729 )
730 });
731 }
732
733 fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
736 if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
737 self.report_disconnect(peer_id);
739 Ok(())
740 } else {
741 Err(peer_id)
743 }
744 }
745
746 fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
750 let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
751
752 if let PeerState::Connected(direction) = state {
753 trace!(
754 target: LOG_TARGET,
755 "Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
756 self.set_id,
757 );
758 *state = PeerState::NotConnected;
759 Ok(true)
760 } else {
761 Err(*peer_id)
762 }
763 }
764
765 fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
768 let Some(direction) = self.nodes.remove(peer_id) else { return false };
769
770 trace!(
771 target: LOG_TARGET,
772 "Peer {peer_id} ({direction:?}) dropped from {:?}.",
773 self.set_id,
774 );
775
776 match direction {
777 Direction::Inbound => self.num_in -= 1,
778 Direction::Outbound => self.num_out -= 1,
779 }
780
781 true
782 }
783
784 fn alloc_slots(&mut self) {
787 self.reserved_nodes
789 .iter_mut()
790 .filter_map(|(peer_id, state)| {
791 (!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
792 *state = PeerState::Connected(Direction::Outbound);
793 peer_id
794 })
795 })
796 .cloned()
797 .collect::<Vec<_>>()
798 .into_iter()
799 .for_each(|peer_id| {
800 self.start_connection(peer_id);
801 });
802
803 if self.reserved_only || self.num_out >= self.max_out {
805 return;
806 }
807
808 let available_slots = (self.max_out - self.num_out).saturated_into();
810
811 let ignored = self
814 .reserved_nodes
815 .keys()
816 .map(From::from)
817 .collect::<HashSet<pezsc_network_types::PeerId>>()
818 .union(
819 &self
820 .nodes
821 .keys()
822 .map(From::from)
823 .collect::<HashSet<pezsc_network_types::PeerId>>(),
824 )
825 .cloned()
826 .collect();
827
828 let candidates = self
829 .peer_store
830 .outgoing_candidates(available_slots, ignored)
831 .into_iter()
832 .filter_map(|peer_id| {
833 (!self.reserved_nodes.contains_key(&peer_id.into())
834 && !self.nodes.contains_key(&peer_id.into()))
835 .then_some(peer_id)
836 .or_else(|| {
837 error!(
838 target: LOG_TARGET,
839 "`PeerStore` returned a node we asked to ignore: {peer_id}.",
840 );
841 debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
842 None
843 })
844 })
845 .collect::<Vec<_>>();
846
847 if candidates.len() > available_slots {
848 error!(
849 target: LOG_TARGET,
850 "`PeerStore` returned more nodes than there are slots available.",
851 );
852 debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
853 }
854
855 candidates.into_iter().take(available_slots).for_each(|peer_id| {
856 self.num_out += 1;
857 self.nodes.insert(peer_id.into(), Direction::Outbound);
858 self.start_connection(peer_id.into());
859 })
860 }
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866 use crate::{
867 peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
868 ReputationChange,
869 };
870 use libp2p::PeerId;
871 use pezsc_network_common::role::ObservedRole;
872 use pezsc_utils::mpsc::{tracing_unbounded, TryRecvError};
873 use std::collections::HashSet;
874
875 mockall::mock! {
876 #[derive(Debug)]
877 pub PeerStoreHandle {}
878
879 impl PeerStoreProvider for PeerStoreHandle {
880 fn is_banned(&self, peer_id: &pezsc_network_types::PeerId) -> bool;
881 fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
882 fn report_disconnect(&self, peer_id: pezsc_network_types::PeerId);
883 fn set_peer_role(&self, peer_id: &pezsc_network_types::PeerId, role: ObservedRole);
884 fn report_peer(&self, peer_id: pezsc_network_types::PeerId, change: ReputationChange);
885 fn peer_reputation(&self, peer_id: &pezsc_network_types::PeerId) -> i32;
886 fn peer_role(&self, peer_id: &pezsc_network_types::PeerId) -> Option<ObservedRole>;
887 fn outgoing_candidates(&self, count: usize, ignored: HashSet<pezsc_network_types::PeerId>) -> Vec<pezsc_network_types::PeerId>;
888 fn add_known_peer(&self, peer_id: pezsc_network_types::PeerId);
889 }
890 }
891
892 #[test]
893 fn reserved_nodes_are_connected_dropped_and_accepted() {
894 let reserved1 = PeerId::random();
895 let reserved2 = PeerId::random();
896
897 let config = ProtoSetConfig {
899 in_peers: 0,
900 out_peers: 0,
901 reserved_nodes: std::iter::once(reserved1).collect(),
902 reserved_only: true,
903 };
904 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
905
906 let mut peer_store = MockPeerStoreHandle::new();
907 peer_store.expect_register_protocol().once().return_const(());
908 peer_store.expect_is_banned().times(4).return_const(false);
909 peer_store.expect_report_disconnect().times(2).return_const(());
910
911 let (_handle, mut controller) =
912 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
913
914 controller.on_add_reserved_peer(reserved2);
916
917 controller.alloc_slots();
920
921 let mut messages = Vec::new();
922 while let Some(message) = rx.try_recv().ok() {
923 messages.push(message);
924 }
925 assert_eq!(messages.len(), 2);
926 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
927 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
928
929 assert_eq!(controller.num_out, 0);
931 assert_eq!(controller.num_in, 0);
932
933 controller.on_peer_dropped(reserved1);
935 controller.on_peer_dropped(reserved2);
936
937 let incoming1 = IncomingIndex(1);
939 controller.on_incoming_connection(reserved1, incoming1);
940 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
941 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
942
943 let incoming2 = IncomingIndex(2);
945 controller.on_incoming_connection(reserved2, incoming2);
946 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
947 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
948
949 assert_eq!(controller.num_out, 0);
951 assert_eq!(controller.num_in, 0);
952 }
953
954 #[test]
955 fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
956 let reserved1 = PeerId::random();
957 let reserved2 = PeerId::random();
958
959 let config = ProtoSetConfig {
961 in_peers: 0,
962 out_peers: 0,
963 reserved_nodes: std::iter::once(reserved1).collect(),
964 reserved_only: true,
965 };
966 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
967
968 let mut peer_store = MockPeerStoreHandle::new();
969 peer_store.expect_register_protocol().once().return_const(());
970 peer_store.expect_is_banned().times(6).return_const(true);
971
972 let (_handle, mut controller) =
973 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
974
975 controller.on_add_reserved_peer(reserved2);
977
978 controller.alloc_slots();
980
981 assert_eq!(controller.num_out, 0);
983 assert_eq!(controller.num_in, 0);
984
985 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
987
988 let incoming1 = IncomingIndex(1);
990 controller.on_incoming_connection(reserved1, incoming1);
991 assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
992 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
993
994 let incoming2 = IncomingIndex(2);
996 controller.on_incoming_connection(reserved2, incoming2);
997 assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
998 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
999
1000 assert_eq!(controller.num_out, 0);
1002 assert_eq!(controller.num_in, 0);
1003 }
1004
1005 #[test]
1006 fn we_try_to_reconnect_to_dropped_reserved_nodes() {
1007 let reserved1 = PeerId::random();
1008 let reserved2 = PeerId::random();
1009
1010 let config = ProtoSetConfig {
1012 in_peers: 0,
1013 out_peers: 0,
1014 reserved_nodes: std::iter::once(reserved1).collect(),
1015 reserved_only: true,
1016 };
1017 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1018
1019 let mut peer_store = MockPeerStoreHandle::new();
1020 peer_store.expect_register_protocol().once().return_const(());
1021 peer_store.expect_is_banned().times(4).return_const(false);
1022 peer_store.expect_report_disconnect().times(2).return_const(());
1023
1024 let (_handle, mut controller) =
1025 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1026
1027 controller.on_add_reserved_peer(reserved2);
1029
1030 controller.alloc_slots();
1032
1033 let mut messages = Vec::new();
1034 while let Some(message) = rx.try_recv().ok() {
1035 messages.push(message);
1036 }
1037
1038 assert_eq!(messages.len(), 2);
1039 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1040 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1041
1042 controller.on_peer_dropped(reserved1);
1044 controller.on_peer_dropped(reserved2);
1045
1046 controller.alloc_slots();
1048
1049 let mut messages = Vec::new();
1050 while let Some(message) = rx.try_recv().ok() {
1051 messages.push(message);
1052 }
1053
1054 assert_eq!(messages.len(), 2);
1055 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1056 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1057
1058 assert_eq!(controller.num_out, 0);
1060 assert_eq!(controller.num_in, 0);
1061 }
1062
1063 #[test]
1064 fn nodes_supplied_by_peer_store_are_connected() {
1065 let peer1 = PeerId::random();
1066 let peer2 = PeerId::random();
1067 let candidates = vec![peer1.into(), peer2.into()];
1068
1069 let config = ProtoSetConfig {
1070 in_peers: 0,
1071 out_peers: 2,
1073 reserved_nodes: HashSet::new(),
1074 reserved_only: false,
1075 };
1076 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1077
1078 let mut peer_store = MockPeerStoreHandle::new();
1079 peer_store.expect_register_protocol().once().return_const(());
1080 peer_store.expect_outgoing_candidates().once().return_const(candidates);
1081
1082 let (_handle, mut controller) =
1083 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1084
1085 controller.alloc_slots();
1087
1088 let mut messages = Vec::new();
1089 while let Some(message) = rx.try_recv().ok() {
1090 messages.push(message);
1091 }
1092
1093 assert_eq!(messages.len(), 2);
1095 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1096 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1097
1098 assert_eq!(controller.num_out, 2);
1100 assert_eq!(controller.num_in, 0);
1101
1102 controller.alloc_slots();
1104 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1105
1106 assert_eq!(controller.num_out, 2);
1108 assert_eq!(controller.num_in, 0);
1109 }
1110
1111 #[test]
1112 fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1113 let reserved1 = PeerId::random();
1114 let reserved2 = PeerId::random();
1115 let regular1 = PeerId::random();
1116 let regular2 = PeerId::random();
1117 let outgoing_candidates = vec![regular1.into(), regular2.into()];
1118 let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1119
1120 let config =
1121 ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1122 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1123
1124 let mut peer_store = MockPeerStoreHandle::new();
1125 peer_store.expect_register_protocol().once().return_const(());
1126 peer_store.expect_is_banned().times(2).return_const(false);
1127 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1128
1129 let (_handle, mut controller) =
1130 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1131
1132 controller.alloc_slots();
1134
1135 let mut messages = Vec::new();
1136 while let Some(message) = rx.try_recv().ok() {
1137 messages.push(message);
1138 }
1139 assert_eq!(messages.len(), 4);
1140 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1141 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1142 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1143 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1144 assert_eq!(controller.num_out, 2);
1145 assert_eq!(controller.num_in, 0);
1146 }
1147
1148 #[test]
1149 fn if_slots_are_freed_we_try_to_allocate_them_again() {
1150 let peer1 = PeerId::random();
1151 let peer2 = PeerId::random();
1152 let peer3 = PeerId::random();
1153 let candidates1 = vec![peer1.into(), peer2.into()];
1154 let candidates2 = vec![peer3.into()];
1155
1156 let config = ProtoSetConfig {
1157 in_peers: 0,
1158 out_peers: 2,
1160 reserved_nodes: HashSet::new(),
1161 reserved_only: false,
1162 };
1163 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1164
1165 let mut peer_store = MockPeerStoreHandle::new();
1166 peer_store.expect_register_protocol().once().return_const(());
1167 peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1168 peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1169 peer_store.expect_report_disconnect().times(2).return_const(());
1170
1171 let (_handle, mut controller) =
1172 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1173
1174 controller.alloc_slots();
1176
1177 let mut messages = Vec::new();
1178 while let Some(message) = rx.try_recv().ok() {
1179 messages.push(message);
1180 }
1181
1182 assert_eq!(messages.len(), 2);
1184 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1185 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1186
1187 assert_eq!(controller.num_out, 2);
1189 assert_eq!(controller.num_in, 0);
1190
1191 controller.alloc_slots();
1193 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1194
1195 assert_eq!(controller.num_out, 2);
1197 assert_eq!(controller.num_in, 0);
1198
1199 controller.on_peer_dropped(peer1);
1201 controller.on_peer_dropped(peer2);
1202
1203 assert_eq!(controller.num_out, 0);
1205 assert_eq!(controller.num_in, 0);
1206
1207 controller.alloc_slots();
1209
1210 let mut messages = Vec::new();
1211 while let Some(message) = rx.try_recv().ok() {
1212 messages.push(message);
1213 }
1214
1215 assert_eq!(messages.len(), 1);
1217 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1218
1219 assert_eq!(controller.num_out, 1);
1221 assert_eq!(controller.num_in, 0);
1222 }
1223
1224 #[test]
1225 fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1226 let config = ProtoSetConfig {
1227 in_peers: 0,
1228 out_peers: 2,
1230 reserved_nodes: HashSet::new(),
1231 reserved_only: true,
1232 };
1233 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1234
1235 let mut peer_store = MockPeerStoreHandle::new();
1236 peer_store.expect_register_protocol().once().return_const(());
1237
1238 let (_handle, mut controller) =
1239 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1240
1241 controller.alloc_slots();
1243
1244 assert_eq!(controller.num_out, 0);
1246 assert_eq!(controller.num_in, 0);
1247 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1248 }
1249
1250 #[test]
1251 fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1252 let config = ProtoSetConfig {
1253 in_peers: 2,
1255 out_peers: 0,
1256 reserved_nodes: HashSet::new(),
1257 reserved_only: true,
1258 };
1259 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1260
1261 let mut peer_store = MockPeerStoreHandle::new();
1262 peer_store.expect_register_protocol().once().return_const(());
1263
1264 let (_handle, mut controller) =
1265 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1266
1267 let peer = PeerId::random();
1268 let incoming_index = IncomingIndex(1);
1269 controller.on_incoming_connection(peer, incoming_index);
1270
1271 let mut messages = Vec::new();
1272 while let Some(message) = rx.try_recv().ok() {
1273 messages.push(message);
1274 }
1275
1276 assert_eq!(messages.len(), 1);
1278 assert!(messages.contains(&Message::Reject(incoming_index)));
1279 assert_eq!(controller.num_out, 0);
1280 assert_eq!(controller.num_in, 0);
1281 }
1282
1283 #[test]
1284 fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1285 let peer1 = PeerId::random();
1286 let peer2 = PeerId::random();
1287 let candidates = vec![peer1.into(), peer2.into()];
1288
1289 let config = ProtoSetConfig {
1290 in_peers: 0,
1291 out_peers: 10,
1293 reserved_nodes: HashSet::new(),
1294 reserved_only: true,
1295 };
1296 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1297
1298 let mut peer_store = MockPeerStoreHandle::new();
1299 peer_store.expect_register_protocol().once().return_const(());
1300 peer_store.expect_outgoing_candidates().once().return_const(candidates);
1301
1302 let (_handle, mut controller) =
1303 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1304
1305 controller.alloc_slots();
1307
1308 assert_eq!(controller.num_out, 0);
1310 assert_eq!(controller.num_in, 0);
1311 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1312
1313 controller.on_set_reserved_only(false);
1315
1316 let mut messages = Vec::new();
1317 while let Some(message) = rx.try_recv().ok() {
1318 messages.push(message);
1319 }
1320
1321 assert_eq!(messages.len(), 2);
1322 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1323 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1324 assert_eq!(controller.num_out, 2);
1325 assert_eq!(controller.num_in, 0);
1326 }
1327
1328 #[test]
1329 fn enabling_reserved_only_mode_disconnects_regular_peers() {
1330 let reserved1 = PeerId::random();
1331 let reserved2 = PeerId::random();
1332 let regular1 = PeerId::random();
1333 let regular2 = PeerId::random();
1334 let outgoing_candidates = vec![regular1.into()];
1335
1336 let config = ProtoSetConfig {
1337 in_peers: 10,
1338 out_peers: 10,
1339 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1340 reserved_only: false,
1341 };
1342 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1343
1344 let mut peer_store = MockPeerStoreHandle::new();
1345 peer_store.expect_register_protocol().once().return_const(());
1346 peer_store.expect_is_banned().times(3).return_const(false);
1347 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1348
1349 let (_handle, mut controller) =
1350 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1351 assert_eq!(controller.num_out, 0);
1352 assert_eq!(controller.num_in, 0);
1353
1354 controller.alloc_slots();
1356
1357 let mut messages = Vec::new();
1358 while let Some(message) = rx.try_recv().ok() {
1359 messages.push(message);
1360 }
1361 assert_eq!(messages.len(), 3);
1362 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1363 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1364 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1365 assert_eq!(controller.num_out, 1);
1366 assert_eq!(controller.num_in, 0);
1367
1368 let incoming_index = IncomingIndex(1);
1370 controller.on_incoming_connection(regular2, incoming_index);
1371 assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1372 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1373 assert_eq!(controller.num_out, 1);
1374 assert_eq!(controller.num_in, 1);
1375
1376 controller.on_set_reserved_only(true);
1378
1379 let mut messages = Vec::new();
1380 while let Some(message) = rx.try_recv().ok() {
1381 messages.push(message);
1382 }
1383 assert_eq!(messages.len(), 2);
1384 assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1385 assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1386 assert_eq!(controller.nodes.len(), 0);
1387 assert_eq!(controller.num_out, 0);
1388 assert_eq!(controller.num_in, 0);
1389 }
1390
1391 #[test]
1392 fn removed_disconnected_reserved_node_is_forgotten() {
1393 let reserved1 = PeerId::random();
1394 let reserved2 = PeerId::random();
1395
1396 let config = ProtoSetConfig {
1397 in_peers: 10,
1398 out_peers: 10,
1399 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1400 reserved_only: false,
1401 };
1402 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1403
1404 let mut peer_store = MockPeerStoreHandle::new();
1405 peer_store.expect_register_protocol().once().return_const(());
1406
1407 let (_handle, mut controller) =
1408 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1409 assert_eq!(controller.reserved_nodes.len(), 2);
1410 assert_eq!(controller.nodes.len(), 0);
1411 assert_eq!(controller.num_out, 0);
1412 assert_eq!(controller.num_in, 0);
1413
1414 controller.on_remove_reserved_peer(reserved1);
1415 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1416 assert_eq!(controller.reserved_nodes.len(), 1);
1417 assert!(!controller.reserved_nodes.contains_key(&reserved1));
1418 assert_eq!(controller.nodes.len(), 0);
1419 assert_eq!(controller.num_out, 0);
1420 assert_eq!(controller.num_in, 0);
1421 }
1422
1423 #[test]
1424 fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1425 let reserved1 = PeerId::random();
1426 let reserved2 = PeerId::random();
1427
1428 let config = ProtoSetConfig {
1429 in_peers: 10,
1430 out_peers: 10,
1431 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1432 reserved_only: true,
1433 };
1434 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1435
1436 let mut peer_store = MockPeerStoreHandle::new();
1437 peer_store.expect_register_protocol().once().return_const(());
1438 peer_store.expect_is_banned().times(2).return_const(false);
1439
1440 let (_handle, mut controller) =
1441 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1442
1443 controller.alloc_slots();
1445 let mut messages = Vec::new();
1446 while let Some(message) = rx.try_recv().ok() {
1447 messages.push(message);
1448 }
1449 assert_eq!(messages.len(), 2);
1450 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1451 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1452 assert_eq!(controller.reserved_nodes.len(), 2);
1453 assert!(controller.reserved_nodes.contains_key(&reserved1));
1454 assert!(controller.reserved_nodes.contains_key(&reserved2));
1455 assert!(controller.nodes.is_empty());
1456
1457 controller.on_remove_reserved_peer(reserved1);
1459 assert_eq!(
1460 rx.try_recv().unwrap(),
1461 Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1462 );
1463 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1464 assert_eq!(controller.reserved_nodes.len(), 1);
1465 assert!(controller.reserved_nodes.contains_key(&reserved2));
1466 assert!(controller.nodes.is_empty());
1467 }
1468
1469 #[test]
1470 fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1471 let peer1 = PeerId::random();
1472 let peer2 = PeerId::random();
1473
1474 let config = ProtoSetConfig {
1475 in_peers: 10,
1476 out_peers: 10,
1477 reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1478 reserved_only: false,
1479 };
1480 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1481
1482 let mut peer_store = MockPeerStoreHandle::new();
1483 peer_store.expect_register_protocol().once().return_const(());
1484 peer_store.expect_is_banned().times(2).return_const(false);
1485 peer_store
1486 .expect_outgoing_candidates()
1487 .once()
1488 .return_const(Vec::<pezsc_network_types::PeerId>::new());
1489
1490 let (_handle, mut controller) =
1491 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1492
1493 controller.on_incoming_connection(peer1, IncomingIndex(1));
1495 controller.alloc_slots();
1496 let mut messages = Vec::new();
1497 while let Some(message) = rx.try_recv().ok() {
1498 messages.push(message);
1499 }
1500 assert_eq!(messages.len(), 2);
1501 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1502 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1503 assert_eq!(controller.num_out, 0);
1504 assert_eq!(controller.num_in, 0);
1505
1506 controller.on_remove_reserved_peer(peer1);
1508 controller.on_remove_reserved_peer(peer2);
1509 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1510 assert_eq!(controller.nodes.len(), 2);
1511 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1512 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1513 assert_eq!(controller.num_out, 1);
1514 assert_eq!(controller.num_in, 1);
1515 }
1516
1517 #[test]
1518 fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1519 let peer1 = PeerId::random();
1520 let peer2 = PeerId::random();
1521 let outgoing_candidates = vec![peer1.into()];
1522
1523 let config = ProtoSetConfig {
1524 in_peers: 10,
1525 out_peers: 10,
1526 reserved_nodes: HashSet::new(),
1527 reserved_only: false,
1528 };
1529 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1530
1531 let mut peer_store = MockPeerStoreHandle::new();
1532 peer_store.expect_register_protocol().once().return_const(());
1533 peer_store.expect_is_banned().once().return_const(false);
1534 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1535
1536 let (_handle, mut controller) =
1537 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1538
1539 controller.alloc_slots();
1541 controller.on_incoming_connection(peer2, IncomingIndex(1));
1542 let mut messages = Vec::new();
1543 while let Some(message) = rx.try_recv().ok() {
1544 messages.push(message);
1545 }
1546 assert_eq!(messages.len(), 2);
1547 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1548 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1549 assert_eq!(controller.num_in, 1);
1550 assert_eq!(controller.num_out, 1);
1551
1552 controller.on_add_reserved_peer(peer1);
1553 controller.on_add_reserved_peer(peer2);
1554 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1555 assert_eq!(controller.num_in, 0);
1556 assert_eq!(controller.num_out, 0);
1557 }
1558
1559 #[test]
1560 fn disconnecting_regular_peers_work() {
1561 let peer1 = PeerId::random();
1562 let peer2 = PeerId::random();
1563 let outgoing_candidates = vec![peer1.into()];
1564
1565 let config = ProtoSetConfig {
1566 in_peers: 10,
1567 out_peers: 10,
1568 reserved_nodes: HashSet::new(),
1569 reserved_only: false,
1570 };
1571 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1572
1573 let mut peer_store = MockPeerStoreHandle::new();
1574 peer_store.expect_register_protocol().once().return_const(());
1575 peer_store.expect_is_banned().once().return_const(false);
1576 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1577
1578 let (_handle, mut controller) =
1579 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1580
1581 controller.alloc_slots();
1583 controller.on_incoming_connection(peer2, IncomingIndex(1));
1584 let mut messages = Vec::new();
1585 while let Some(message) = rx.try_recv().ok() {
1586 messages.push(message);
1587 }
1588 assert_eq!(messages.len(), 2);
1589 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1590 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1591 assert_eq!(controller.nodes.len(), 2);
1592 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1593 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1594 assert_eq!(controller.num_in, 1);
1595 assert_eq!(controller.num_out, 1);
1596
1597 controller.on_disconnect_peer(peer1);
1598 assert_eq!(
1599 rx.try_recv().unwrap(),
1600 Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1601 );
1602 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1603 assert_eq!(controller.nodes.len(), 1);
1604 assert!(!controller.nodes.contains_key(&peer1));
1605 assert_eq!(controller.num_in, 1);
1606 assert_eq!(controller.num_out, 0);
1607
1608 controller.on_disconnect_peer(peer2);
1609 assert_eq!(
1610 rx.try_recv().unwrap(),
1611 Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1612 );
1613 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1614 assert_eq!(controller.nodes.len(), 0);
1615 assert_eq!(controller.num_in, 0);
1616 assert_eq!(controller.num_out, 0);
1617 }
1618
1619 #[test]
1620 fn disconnecting_reserved_peers_is_a_noop() {
1621 let reserved1 = PeerId::random();
1622 let reserved2 = PeerId::random();
1623
1624 let config = ProtoSetConfig {
1625 in_peers: 10,
1626 out_peers: 10,
1627 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1628 reserved_only: false,
1629 };
1630 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1631
1632 let mut peer_store = MockPeerStoreHandle::new();
1633 peer_store.expect_register_protocol().once().return_const(());
1634 peer_store.expect_is_banned().times(2).return_const(false);
1635 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1636
1637 let (_handle, mut controller) =
1638 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1639
1640 controller.on_incoming_connection(reserved1, IncomingIndex(1));
1642 controller.alloc_slots();
1643 let mut messages = Vec::new();
1644 while let Some(message) = rx.try_recv().ok() {
1645 messages.push(message);
1646 }
1647 assert_eq!(messages.len(), 2);
1648 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1649 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1650 assert!(matches!(
1651 controller.reserved_nodes.get(&reserved1),
1652 Some(PeerState::Connected(Direction::Inbound))
1653 ));
1654 assert!(matches!(
1655 controller.reserved_nodes.get(&reserved2),
1656 Some(PeerState::Connected(Direction::Outbound))
1657 ));
1658
1659 controller.on_disconnect_peer(reserved1);
1660 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1661 assert!(matches!(
1662 controller.reserved_nodes.get(&reserved1),
1663 Some(PeerState::Connected(Direction::Inbound))
1664 ));
1665
1666 controller.on_disconnect_peer(reserved2);
1667 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1668 assert!(matches!(
1669 controller.reserved_nodes.get(&reserved2),
1670 Some(PeerState::Connected(Direction::Outbound))
1671 ));
1672 }
1673
1674 #[test]
1675 fn dropping_regular_peers_work() {
1676 let peer1 = PeerId::random();
1677 let peer2 = PeerId::random();
1678 let outgoing_candidates = vec![peer1.into()];
1679
1680 let config = ProtoSetConfig {
1681 in_peers: 10,
1682 out_peers: 10,
1683 reserved_nodes: HashSet::new(),
1684 reserved_only: false,
1685 };
1686 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1687
1688 let mut peer_store = MockPeerStoreHandle::new();
1689 peer_store.expect_register_protocol().once().return_const(());
1690 peer_store.expect_is_banned().once().return_const(false);
1691 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1692 peer_store.expect_report_disconnect().times(2).return_const(());
1693
1694 let (_handle, mut controller) =
1695 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1696
1697 controller.alloc_slots();
1699 controller.on_incoming_connection(peer2, IncomingIndex(1));
1700 let mut messages = Vec::new();
1701 while let Some(message) = rx.try_recv().ok() {
1702 messages.push(message);
1703 }
1704 assert_eq!(messages.len(), 2);
1705 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1706 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1707 assert_eq!(controller.nodes.len(), 2);
1708 assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1709 assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1710 assert_eq!(controller.num_in, 1);
1711 assert_eq!(controller.num_out, 1);
1712
1713 controller.on_peer_dropped(peer1);
1714 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1715 assert_eq!(controller.nodes.len(), 1);
1716 assert!(!controller.nodes.contains_key(&peer1));
1717 assert_eq!(controller.num_in, 1);
1718 assert_eq!(controller.num_out, 0);
1719
1720 controller.on_peer_dropped(peer2);
1721 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1722 assert_eq!(controller.nodes.len(), 0);
1723 assert_eq!(controller.num_in, 0);
1724 assert_eq!(controller.num_out, 0);
1725 }
1726
1727 #[test]
1728 fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1729 let reserved1 = PeerId::random();
1730 let reserved2 = PeerId::random();
1731
1732 let config = ProtoSetConfig {
1733 in_peers: 10,
1734 out_peers: 10,
1735 reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1736 reserved_only: false,
1737 };
1738 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1739
1740 let mut peer_store = MockPeerStoreHandle::new();
1741 peer_store.expect_register_protocol().once().return_const(());
1742 peer_store.expect_is_banned().times(2).return_const(false);
1743 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1744
1745 let (_handle, mut controller) =
1746 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1747
1748 controller.on_incoming_connection(reserved1, IncomingIndex(1));
1750 controller.alloc_slots();
1751 let mut messages = Vec::new();
1752 while let Some(message) = rx.try_recv().ok() {
1753 messages.push(message);
1754 }
1755 assert_eq!(messages.len(), 2);
1756 assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1757 assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1758 assert!(matches!(
1759 controller.reserved_nodes.get(&reserved1),
1760 Some(PeerState::Connected(Direction::Inbound))
1761 ));
1762 assert!(matches!(
1763 controller.reserved_nodes.get(&reserved2),
1764 Some(PeerState::Connected(Direction::Outbound))
1765 ));
1766
1767 controller.on_incoming_connection(reserved1, IncomingIndex(2));
1769 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1770 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1771 assert!(matches!(
1772 controller.reserved_nodes.get(&reserved1),
1773 Some(PeerState::Connected(Direction::Inbound))
1774 ));
1775
1776 controller.on_incoming_connection(reserved2, IncomingIndex(3));
1778 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1779 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1780 assert!(matches!(
1781 controller.reserved_nodes.get(&reserved2),
1782 Some(PeerState::Connected(Direction::Inbound))
1783 ));
1784 }
1785
1786 #[test]
1787 fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1788 let regular1 = PeerId::random();
1789 let regular2 = PeerId::random();
1790 let outgoing_candidates = vec![regular1.into()];
1791
1792 let config = ProtoSetConfig {
1793 in_peers: 10,
1794 out_peers: 10,
1795 reserved_nodes: HashSet::new(),
1796 reserved_only: false,
1797 };
1798 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1799
1800 let mut peer_store = MockPeerStoreHandle::new();
1801 peer_store.expect_register_protocol().once().return_const(());
1802 peer_store.expect_is_banned().times(3).return_const(false);
1803 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1804
1805 let (_handle, mut controller) =
1806 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1807 assert_eq!(controller.num_out, 0);
1808 assert_eq!(controller.num_in, 0);
1809
1810 controller.alloc_slots();
1812 assert_eq!(
1813 rx.try_recv().ok().unwrap(),
1814 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1815 );
1816 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1817 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1818
1819 controller.on_incoming_connection(regular2, IncomingIndex(0));
1821 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1822 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1823 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1824
1825 controller.on_incoming_connection(regular1, IncomingIndex(1));
1827 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1828 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1829 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Inbound,));
1830
1831 controller.on_incoming_connection(regular2, IncomingIndex(2));
1833 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1834 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1835 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1836 }
1837
1838 #[test]
1839 fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1840 let regular1 = PeerId::random();
1841 let regular2 = PeerId::random();
1842 let outgoing_candidates = vec![regular1.into()];
1843
1844 let config = ProtoSetConfig {
1845 in_peers: 10,
1846 out_peers: 10,
1847 reserved_nodes: HashSet::new(),
1848 reserved_only: false,
1849 };
1850 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1851
1852 let mut peer_store = MockPeerStoreHandle::new();
1853 peer_store.expect_register_protocol().once().return_const(());
1854 peer_store.expect_is_banned().once().return_const(false);
1855 peer_store.expect_is_banned().times(2).return_const(true);
1856 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1857
1858 let (_handle, mut controller) =
1859 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1860 assert_eq!(controller.num_out, 0);
1861 assert_eq!(controller.num_in, 0);
1862
1863 controller.alloc_slots();
1865 assert_eq!(
1866 rx.try_recv().ok().unwrap(),
1867 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1868 );
1869 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1870 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1871
1872 controller.on_incoming_connection(regular2, IncomingIndex(0));
1874 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1875 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1876 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1877
1878 controller.on_incoming_connection(regular1, IncomingIndex(1));
1880 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1881 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1882 assert!(!controller.nodes.contains_key(®ular1));
1883
1884 controller.on_incoming_connection(regular2, IncomingIndex(2));
1886 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1887 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1888 assert!(!controller.nodes.contains_key(®ular2));
1889 }
1890
1891 #[test]
1892 fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1893 let regular1 = PeerId::random();
1894 let regular2 = PeerId::random();
1895 let outgoing_candidates = vec![regular1.into()];
1896
1897 let config = ProtoSetConfig {
1898 in_peers: 1,
1899 out_peers: 1,
1900 reserved_nodes: HashSet::new(),
1901 reserved_only: false,
1902 };
1903 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1904
1905 let mut peer_store = MockPeerStoreHandle::new();
1906 peer_store.expect_register_protocol().once().return_const(());
1907 peer_store.expect_is_banned().once().return_const(false);
1908 peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1909
1910 let (_handle, mut controller) =
1911 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1912 assert_eq!(controller.num_out, 0);
1913 assert_eq!(controller.num_in, 0);
1914
1915 controller.alloc_slots();
1917 assert_eq!(
1918 rx.try_recv().ok().unwrap(),
1919 Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1920 );
1921 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1922 assert!(matches!(controller.nodes.get(®ular1).unwrap(), Direction::Outbound,));
1923
1924 controller.on_incoming_connection(regular2, IncomingIndex(0));
1926 assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1927 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1928 assert!(matches!(controller.nodes.get(®ular2).unwrap(), Direction::Inbound,));
1929
1930 controller.max_in = 0;
1931
1932 controller.on_incoming_connection(regular1, IncomingIndex(1));
1934 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1935 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1936 assert!(!controller.nodes.contains_key(®ular1));
1937
1938 controller.on_incoming_connection(regular2, IncomingIndex(2));
1940 assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1941 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1942 assert!(!controller.nodes.contains_key(®ular2));
1943 }
1944
1945 #[test]
1946 fn incoming_peers_that_exceed_slots_are_rejected() {
1947 let peer1 = PeerId::random();
1948 let peer2 = PeerId::random();
1949
1950 let config = ProtoSetConfig {
1951 in_peers: 1,
1952 out_peers: 10,
1953 reserved_nodes: HashSet::new(),
1954 reserved_only: false,
1955 };
1956 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1957
1958 let mut peer_store = MockPeerStoreHandle::new();
1959 peer_store.expect_register_protocol().once().return_const(());
1960 peer_store.expect_is_banned().once().return_const(false);
1961
1962 let (_handle, mut controller) =
1963 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1964
1965 controller.on_incoming_connection(peer1, IncomingIndex(1));
1967 assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1968 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1969
1970 controller.on_incoming_connection(peer2, IncomingIndex(2));
1972 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1973 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1974 }
1975
1976 #[test]
1977 fn banned_regular_incoming_node_is_rejected() {
1978 let peer1 = PeerId::random();
1979
1980 let config = ProtoSetConfig {
1981 in_peers: 10,
1982 out_peers: 10,
1983 reserved_nodes: HashSet::new(),
1984 reserved_only: false,
1985 };
1986 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1987
1988 let mut peer_store = MockPeerStoreHandle::new();
1989 peer_store.expect_register_protocol().once().return_const(());
1990 peer_store.expect_is_banned().once().return_const(true);
1991
1992 let (_handle, mut controller) =
1993 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1994
1995 controller.on_incoming_connection(peer1, IncomingIndex(1));
1997 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1998 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1999 }
2000
2001 #[test]
2002 fn banned_reserved_incoming_node_is_rejected() {
2003 let reserved1 = PeerId::random();
2004
2005 let config = ProtoSetConfig {
2006 in_peers: 10,
2007 out_peers: 10,
2008 reserved_nodes: std::iter::once(reserved1).collect(),
2009 reserved_only: false,
2010 };
2011 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2012
2013 let mut peer_store = MockPeerStoreHandle::new();
2014 peer_store.expect_register_protocol().once().return_const(());
2015 peer_store.expect_is_banned().once().return_const(true);
2016
2017 let (_handle, mut controller) =
2018 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2019 assert!(controller.reserved_nodes.contains_key(&reserved1));
2020
2021 controller.on_incoming_connection(reserved1, IncomingIndex(1));
2023 assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2024 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2025 }
2026
2027 #[test]
2028 fn we_dont_connect_to_banned_reserved_node() {
2029 let reserved1 = PeerId::random();
2030
2031 let config = ProtoSetConfig {
2032 in_peers: 10,
2033 out_peers: 10,
2034 reserved_nodes: std::iter::once(reserved1).collect(),
2035 reserved_only: false,
2036 };
2037 let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2038
2039 let mut peer_store = MockPeerStoreHandle::new();
2040 peer_store.expect_register_protocol().once().return_const(());
2041 peer_store.expect_is_banned().once().return_const(true);
2042 peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2043
2044 let (_handle, mut controller) =
2045 ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2046 assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2047
2048 controller.alloc_slots();
2050 assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2051 assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2052 }
2053}