pezsc_network/
protocol_controller.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Protocol Controller. Generic implementation of peer management for protocols.
20//! Responsible for accepting/rejecting incoming connections and initiating outgoing connections,
21//! respecting the inbound and outbound peer slot counts. Communicates with `PeerStore` to get and
22//! update peer reputation values and sends commands to `Notifications`.
23//!
24//! Due to asynchronous nature of communication between `ProtocolController` and `Notifications`,
25//! `ProtocolController` has an imperfect view of the states of the peers. To reduce this
26//! desynchronization, the following measures are taken:
27//!
28//! 1. Network peer events from `Notifications` are prioritized over actions from external API and
29//!    internal actions by `ProtocolController` (like slot allocation).
30//! 2. `Notifications` ignores all commands from `ProtocolController` after sending "incoming"
31//!    request until receiving the answer to this "incoming" request.
32//! 3. After sending a "connect" message, `ProtocolController` switches the state of the peer from
33//!    `Outbound` to `Inbound` if it receives an "incoming" request from `Notifications` for this
34//!    peer.
35//!
36//! These measures do not eliminate confusing commands from `ProtocolController` completely,
37//! so `Notifications` must correctly handle seemingly inconsistent commands, like a "connect"
38//! command for the peer it thinks is already connected, and a "drop" command for a peer that
39//! was previously dropped.
40//!
41//! Even though this does not guarantee that `ProtocolController` and `Notifications` have the same
42//! view of the peers' states at any given moment, the eventual consistency is maintained.
43
44use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
45
46use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
47use libp2p::PeerId;
48use log::{debug, error, trace, warn};
49use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
50use pezsp_arithmetic::traits::SaturatedConversion;
51use std::{
52	collections::{HashMap, HashSet},
53	sync::Arc,
54	time::{Duration, Instant},
55};
56use wasm_timer::Delay;
57
58/// Log target for this file.
59pub const LOG_TARGET: &str = "peerset";
60
61/// `Notifications` protocol index. For historical reasons it's called `SetId`, because it
62/// used to refer to a set of peers in a peerset for this protocol.
63///
64/// Can be constructed using the `From<usize>` trait implementation based on the index of the
65/// protocol in `Notifications`.
66#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct SetId(usize);
68
69impl SetId {
70	/// Const conversion function for initialization of hardcoded peerset indices.
71	pub const fn from(id: usize) -> Self {
72		Self(id)
73	}
74}
75
76impl From<usize> for SetId {
77	fn from(id: usize) -> Self {
78		Self(id)
79	}
80}
81
82impl From<SetId> for usize {
83	fn from(id: SetId) -> Self {
84		id.0
85	}
86}
87
88/// Configuration for a set of nodes for a specific protocol.
89#[derive(Debug)]
90pub struct ProtoSetConfig {
91	/// Maximum number of incoming links to peers.
92	pub in_peers: u32,
93
94	/// Maximum number of outgoing links to peers.
95	pub out_peers: u32,
96
97	/// Lists of nodes we should always be connected to.
98	///
99	/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
100	/// >			otherwise it will not be able to connect to them.
101	pub reserved_nodes: HashSet<PeerId>,
102
103	/// If true, we only accept nodes in [`ProtoSetConfig::reserved_nodes`].
104	pub reserved_only: bool,
105}
106
107/// Message that is sent by [`ProtocolController`] to `Notifications`.
108#[derive(Debug, PartialEq)]
109pub enum Message {
110	/// Request to open a connection to the given peer. From the point of view of the
111	/// `ProtocolController`, we are immediately connected.
112	Connect {
113		/// Set id to connect on.
114		set_id: SetId,
115		/// Peer to connect to.
116		peer_id: PeerId,
117	},
118
119	/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
120	Drop {
121		/// Set id to disconnect on.
122		set_id: SetId,
123		/// Peer to disconnect from.
124		peer_id: PeerId,
125	},
126
127	/// Equivalent to `Connect` for the peer corresponding to this incoming index.
128	Accept(IncomingIndex),
129
130	/// Equivalent to `Drop` for the peer corresponding to this incoming index.
131	Reject(IncomingIndex),
132}
133
134/// Opaque identifier for an incoming connection. Allocated by the network.
135#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub struct IncomingIndex(pub u64);
137
138impl From<u64> for IncomingIndex {
139	fn from(val: u64) -> Self {
140		Self(val)
141	}
142}
143
144/// External API actions.
145#[derive(Debug)]
146enum Action {
147	/// Add a reserved peer or mark already connected peer as reserved.
148	AddReservedPeer(PeerId),
149	/// Remove a reserved peer.
150	RemoveReservedPeer(PeerId),
151	/// Update reserved peers to match the provided set.
152	SetReservedPeers(HashSet<PeerId>),
153	/// Set/unset reserved-only mode.
154	SetReservedOnly(bool),
155	/// Disconnect a peer.
156	DisconnectPeer(PeerId),
157	/// Get the list of reserved peers.
158	GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
159}
160
161/// Network events from `Notifications`.
162#[derive(Debug)]
163enum Event {
164	/// Incoming connection from the peer.
165	IncomingConnection(PeerId, IncomingIndex),
166	/// Connection with the peer dropped.
167	Dropped(PeerId),
168}
169
170/// Shared handle to [`ProtocolController`]. Distributed around the code outside of the
171/// protocol implementation.
172#[derive(Debug, Clone)]
173pub struct ProtocolHandle {
174	/// Actions from outer API.
175	actions_tx: TracingUnboundedSender<Action>,
176	/// Connection events from `Notifications`. We prioritize them over actions.
177	events_tx: TracingUnboundedSender<Event>,
178}
179
180impl ProtocolHandle {
181	/// Adds a new reserved peer. [`ProtocolController`] will make an effort
182	/// to always remain connected to this peer.
183	///
184	/// Has no effect if the node was already a reserved peer.
185	///
186	/// > **Note**: Keep in mind that the networking has to know an address for this node,
187	/// > otherwise it will not be able to connect to it.
188	pub fn add_reserved_peer(&self, peer_id: PeerId) {
189		let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
190	}
191
192	/// Demotes reserved peer to non-reserved. Does not disconnect the peer.
193	///
194	/// Has no effect if the node was not a reserved peer.
195	pub fn remove_reserved_peer(&self, peer_id: PeerId) {
196		let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
197	}
198
199	/// Set reserved peers to the new set.
200	pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
201		let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
202	}
203
204	/// Sets whether or not [`ProtocolController`] only has connections with nodes marked
205	/// as reserved for the given set.
206	pub fn set_reserved_only(&self, reserved: bool) {
207		let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
208	}
209
210	/// Disconnect peer. You should remove the `PeerId` from the `PeerStore` first
211	/// to not connect to the peer again during the next slot allocation.
212	pub fn disconnect_peer(&self, peer_id: PeerId) {
213		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
214	}
215
216	/// Get the list of reserved peers.
217	pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
218		let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
219	}
220
221	/// Notify about incoming connection. [`ProtocolController`] will either accept or reject it.
222	pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
223		let _ = self
224			.events_tx
225			.unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
226	}
227
228	/// Notify that connection was dropped (either refused or disconnected).
229	pub fn dropped(&self, peer_id: PeerId) {
230		let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
231	}
232}
233
234impl ProtocolHandleT for ProtocolHandle {
235	fn disconnect_peer(&self, peer_id: pezsc_network_types::PeerId) {
236		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
237	}
238}
239
240/// Direction of a connection
241#[derive(Clone, Copy, Debug)]
242enum Direction {
243	Inbound,
244	Outbound,
245}
246
247/// Status of a connection with a peer.
248#[derive(Clone, Debug)]
249enum PeerState {
250	/// We are connected to the peer.
251	Connected(Direction),
252	/// We are not connected.
253	NotConnected,
254}
255
256impl PeerState {
257	/// Returns true if we are connected with the node.
258	fn is_connected(&self) -> bool {
259		matches!(self, PeerState::Connected(_))
260	}
261}
262
263impl Default for PeerState {
264	fn default() -> PeerState {
265		PeerState::NotConnected
266	}
267}
268
269/// Worker side of [`ProtocolHandle`] responsible for all the logic.
270#[derive(Debug)]
271pub struct ProtocolController {
272	/// Set id to use when sending connect/drop requests to `Notifications`.
273	// Will likely be replaced by `ProtocolName` in the future.
274	set_id: SetId,
275	/// Receiver for outer API messages from [`ProtocolHandle`].
276	actions_rx: TracingUnboundedReceiver<Action>,
277	/// Receiver for connection events from `Notifications` sent via [`ProtocolHandle`].
278	events_rx: TracingUnboundedReceiver<Event>,
279	/// Number of occupied slots for incoming connections (not counting reserved nodes).
280	num_in: u32,
281	/// Number of occupied slots for outgoing connections (not counting reserved nodes).
282	num_out: u32,
283	/// Maximum number of slots for incoming connections (not counting reserved nodes).
284	max_in: u32,
285	/// Maximum number of slots for outgoing connections (not counting reserved nodes).
286	max_out: u32,
287	/// Connected regular nodes.
288	nodes: HashMap<PeerId, Direction>,
289	/// Reserved nodes. Should be always connected and do not occupy peer slots.
290	reserved_nodes: HashMap<PeerId, PeerState>,
291	/// Connect only to reserved nodes.
292	reserved_only: bool,
293	/// Next time to allocate slots. This is done once per second.
294	next_periodic_alloc_slots: Instant,
295	/// Outgoing channel for messages to `Notifications`.
296	to_notifications: TracingUnboundedSender<Message>,
297	/// `PeerStore` handle for checking peer reputation values and getting connection candidates
298	/// with highest reputation.
299	peer_store: Arc<dyn PeerStoreProvider>,
300}
301
302impl ProtocolController {
303	/// Construct new [`ProtocolController`].
304	pub fn new(
305		set_id: SetId,
306		config: ProtoSetConfig,
307		to_notifications: TracingUnboundedSender<Message>,
308		peer_store: Arc<dyn PeerStoreProvider>,
309	) -> (ProtocolHandle, ProtocolController) {
310		let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
311		let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
312		let handle = ProtocolHandle { actions_tx, events_tx };
313		peer_store.register_protocol(Arc::new(handle.clone()));
314		let reserved_nodes =
315			config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
316		let controller = ProtocolController {
317			set_id,
318			actions_rx,
319			events_rx,
320			num_in: 0,
321			num_out: 0,
322			max_in: config.in_peers,
323			max_out: config.out_peers,
324			nodes: HashMap::new(),
325			reserved_nodes,
326			reserved_only: config.reserved_only,
327			next_periodic_alloc_slots: Instant::now(),
328			to_notifications,
329			peer_store,
330		};
331		(handle, controller)
332	}
333
334	/// Drive [`ProtocolController`]. This function returns when all instances of
335	/// [`ProtocolHandle`] are dropped.
336	pub async fn run(mut self) {
337		while self.next_action().await {}
338	}
339
340	/// Perform one action. Returns `true` if it should be called again.
341	///
342	/// Intended for tests only. Use `run` for driving [`ProtocolController`].
343	pub async fn next_action(&mut self) -> bool {
344		let either = loop {
345			let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
346
347			// See the module doc for why we use `select_biased!`.
348			futures::select_biased! {
349				event = self.events_rx.next() => match event {
350					Some(event) => break Either::Left(event),
351					None => return false,
352				},
353				action = self.actions_rx.next() => match action {
354					Some(action) => break Either::Right(action),
355					None => return false,
356				},
357				_ = next_alloc_slots => {
358					self.alloc_slots();
359					self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
360				},
361			}
362		};
363
364		match either {
365			Either::Left(event) => self.process_event(event),
366			Either::Right(action) => self.process_action(action),
367		}
368
369		true
370	}
371
372	/// Process connection event.
373	fn process_event(&mut self, event: Event) {
374		match event {
375			Event::IncomingConnection(peer_id, index) => {
376				self.on_incoming_connection(peer_id, index)
377			},
378			Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
379		}
380	}
381
382	/// Process action command.
383	fn process_action(&mut self, action: Action) {
384		match action {
385			Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
386			Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
387			Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
388			Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
389			Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
390			Action::GetReservedPeers(pending_response) => {
391				self.on_get_reserved_peers(pending_response)
392			},
393		}
394	}
395
396	/// Send "accept" message to `Notifications`.
397	fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
398		trace!(
399			target: LOG_TARGET,
400			"Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
401			self.set_id,
402			self.num_in,
403			self.max_in,
404		);
405
406		let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
407	}
408
409	/// Send "reject" message to `Notifications`.
410	fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
411		trace!(
412			target: LOG_TARGET,
413			"Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
414			self.set_id,
415			self.num_in,
416			self.max_in,
417		);
418
419		let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
420	}
421
422	/// Send "connect" message to `Notifications`.
423	fn start_connection(&mut self, peer_id: PeerId) {
424		trace!(
425			target: LOG_TARGET,
426			"Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
427			self.set_id,
428			self.num_out,
429			self.max_out,
430		);
431
432		let _ = self
433			.to_notifications
434			.unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
435	}
436
437	/// Send "drop" message to `Notifications`.
438	fn drop_connection(&mut self, peer_id: PeerId) {
439		trace!(
440			target: LOG_TARGET,
441			"Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
442			self.set_id,
443			self.num_in,
444			self.max_in,
445			self.num_out,
446			self.max_out,
447		);
448
449		let _ = self
450			.to_notifications
451			.unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
452	}
453
454	/// Report peer disconnect event to `PeerStore` for it to update peer's reputation accordingly.
455	/// Should only be called if the remote node disconnected us, not the other way around.
456	fn report_disconnect(&mut self, peer_id: PeerId) {
457		self.peer_store.report_disconnect(peer_id.into());
458	}
459
460	/// Ask `Peerset` if the peer has a reputation value not sufficient for connection with it.
461	fn is_banned(&self, peer_id: &PeerId) -> bool {
462		self.peer_store.is_banned(&peer_id.into())
463	}
464
465	/// Add the peer to the set of reserved peers. [`ProtocolController`] will try to always
466	/// maintain connections with such peers.
467	fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
468		if self.reserved_nodes.contains_key(&peer_id) {
469			debug!(
470				target: LOG_TARGET,
471				"Trying to add an already reserved node {peer_id} as reserved on {:?}.",
472				self.set_id,
473			);
474			return;
475		}
476
477		// Get the peer out of non-reserved peers if it's there.
478		let state = match self.nodes.remove(&peer_id) {
479			Some(direction) => {
480				trace!(
481					target: LOG_TARGET,
482					"Marking previously connected node {} ({:?}) as reserved on {:?}.",
483					peer_id,
484					direction,
485					self.set_id
486				);
487				PeerState::Connected(direction)
488			},
489			None => {
490				trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
491				PeerState::NotConnected
492			},
493		};
494
495		self.reserved_nodes.insert(peer_id, state.clone());
496
497		// Discount occupied slots or connect to the node.
498		match state {
499			PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
500			PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
501			PeerState::NotConnected => self.alloc_slots(),
502		}
503	}
504
505	/// Remove the peer from the set of reserved peers. The peer is either moved to the set of
506	/// regular nodes or disconnected.
507	fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
508		let state = match self.reserved_nodes.remove(&peer_id) {
509			Some(state) => state,
510			None => {
511				warn!(
512					target: LOG_TARGET,
513					"Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
514				);
515				return;
516			},
517		};
518
519		if let PeerState::Connected(direction) = state {
520			// Disconnect if we're at (or over) the regular node limit
521			let disconnect = self.reserved_only
522				|| match direction {
523					Direction::Inbound => self.num_in >= self.max_in,
524					Direction::Outbound => self.num_out >= self.max_out,
525				};
526
527			if disconnect {
528				// Disconnect the node.
529				trace!(
530					target: LOG_TARGET,
531					"Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
532					self.set_id,
533				);
534				self.drop_connection(peer_id);
535			} else {
536				// Count connections as of regular node.
537				trace!(
538					target: LOG_TARGET,
539					"Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
540					direction,
541					self.set_id,
542				);
543
544				match direction {
545					Direction::Inbound => self.num_in += 1,
546					Direction::Outbound => self.num_out += 1,
547				}
548
549				// Put the node into the list of regular nodes.
550				let prev = self.nodes.insert(peer_id, direction);
551				assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
552			}
553		} else {
554			trace!(
555				target: LOG_TARGET,
556				"Removed disconnected reserved node {peer_id} from {:?}.",
557				self.set_id,
558			);
559		}
560	}
561
562	/// Replace the set of reserved peers.
563	fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
564		// Determine the difference between the current group and the new list.
565		let current = self.reserved_nodes.keys().cloned().collect();
566		let to_insert = peer_ids.difference(&current).cloned().collect::<Vec<_>>();
567		let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
568
569		for node in to_insert {
570			self.on_add_reserved_peer(node);
571		}
572
573		for node in to_remove {
574			self.on_remove_reserved_peer(node);
575		}
576	}
577
578	/// Change "reserved only" flag. In "reserved only" mode we connect and accept connections to
579	/// reserved nodes only.
580	fn on_set_reserved_only(&mut self, reserved_only: bool) {
581		trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
582
583		self.reserved_only = reserved_only;
584
585		if !reserved_only {
586			return self.alloc_slots();
587		}
588
589		// Disconnect all non-reserved peers.
590		self.nodes
591			.iter()
592			.map(|(k, v)| (*k, *v))
593			.collect::<Vec<(_, _)>>()
594			.iter()
595			.for_each(|(peer_id, direction)| {
596				// Update counters in the loop for `drop_connection` to report the correct number.
597				match direction {
598					Direction::Inbound => self.num_in -= 1,
599					Direction::Outbound => self.num_out -= 1,
600				}
601				self.drop_connection(*peer_id)
602			});
603		self.nodes.clear();
604	}
605
606	/// Get the list of reserved peers.
607	fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
608		let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
609	}
610
611	/// Disconnect the peer.
612	fn on_disconnect_peer(&mut self, peer_id: PeerId) {
613		// Don't do anything if the node is reserved.
614		if self.reserved_nodes.contains_key(&peer_id) {
615			debug!(
616				target: LOG_TARGET,
617				"Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
618			);
619			return;
620		}
621
622		match self.nodes.remove(&peer_id) {
623			Some(direction) => {
624				trace!(
625					target: LOG_TARGET,
626					"Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
627					self.set_id
628				);
629				match direction {
630					Direction::Inbound => self.num_in -= 1,
631					Direction::Outbound => self.num_out -= 1,
632				}
633				self.drop_connection(peer_id);
634			},
635			None => {
636				debug!(
637					target: LOG_TARGET,
638					"Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
639				);
640			},
641		}
642	}
643
644	/// Indicate that we received an incoming connection. Must be answered either with
645	/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
646	///
647	/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
648	/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
649	/// `dropped`.
650	// Implementation note: because of concurrency issues, `ProtocolController` has an imperfect
651	// view of the peers' states, and may issue commands for a peer after `Notifications` received
652	// an incoming request for that peer. In this case, `Notifications` ignores all the commands
653	// until it receives a response for the incoming request to `ProtocolController`, so we must
654	// ensure we handle this incoming request correctly.
655	fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
656		trace!(
657			target: LOG_TARGET,
658			"Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
659			self.set_id,
660		);
661
662		if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
663			self.reject_connection(peer_id, incoming_index);
664			return;
665		}
666
667		// Check if the node is reserved first.
668		if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
669			match state {
670				PeerState::Connected(ref mut direction) => {
671					// We are accepting an incoming connection, so ensure the direction is inbound.
672					// (See the implementation note above.)
673					*direction = Direction::Inbound;
674					self.accept_connection(peer_id, incoming_index);
675				},
676				PeerState::NotConnected => {
677					if self.peer_store.is_banned(&peer_id.into()) {
678						self.reject_connection(peer_id, incoming_index);
679					} else {
680						*state = PeerState::Connected(Direction::Inbound);
681						self.accept_connection(peer_id, incoming_index);
682					}
683				},
684			}
685			return;
686		}
687
688		// If we're already connected, pretend we are not connected and decide on the node again.
689		// (See the note above.)
690		if let Some(direction) = self.nodes.remove(&peer_id) {
691			trace!(
692				target: LOG_TARGET,
693				"Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
694				peer_id,
695				direction,
696				self.set_id
697			);
698			match direction {
699				Direction::Inbound => self.num_in -= 1,
700				Direction::Outbound => self.num_out -= 1,
701			}
702		}
703
704		if self.num_in >= self.max_in {
705			self.reject_connection(peer_id, incoming_index);
706			return;
707		}
708
709		if self.is_banned(&peer_id) {
710			self.reject_connection(peer_id, incoming_index);
711			return;
712		}
713
714		self.num_in += 1;
715		self.nodes.insert(peer_id, Direction::Inbound);
716		self.accept_connection(peer_id, incoming_index);
717	}
718
719	/// Indicate that a connection with the peer was dropped.
720	fn on_peer_dropped(&mut self, peer_id: PeerId) {
721		self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
722			// We do not assert here, because due to asynchronous nature of communication
723			// between `ProtocolController` and `Notifications` we can receive `Action::Dropped`
724			// for a peer we already disconnected ourself.
725			trace!(
726				target: LOG_TARGET,
727				"Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
728				self.set_id,
729			)
730		});
731	}
732
733	/// Indicate that a connection with the peer was dropped.
734	/// Returns `Err(PeerId)` if the peer wasn't connected or is not known to us.
735	fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
736		if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
737			// The peer found and disconnected.
738			self.report_disconnect(peer_id);
739			Ok(())
740		} else {
741			// The peer was not found in neither regular or reserved lists.
742			Err(peer_id)
743		}
744	}
745
746	/// Try dropping the peer as a reserved peer. Return `Ok(true)` if the peer was found and
747	/// disconnected, `Ok(false)` if it wasn't found, `Err(PeerId)`, if the peer found, but not in
748	/// connected state.
749	fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
750		let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
751
752		if let PeerState::Connected(direction) = state {
753			trace!(
754				target: LOG_TARGET,
755				"Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
756				self.set_id,
757			);
758			*state = PeerState::NotConnected;
759			Ok(true)
760		} else {
761			Err(*peer_id)
762		}
763	}
764
765	/// Try dropping the peer as a regular peer. Return `true` if the peer was found and
766	/// disconnected, `false` if it wasn't found.
767	fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
768		let Some(direction) = self.nodes.remove(peer_id) else { return false };
769
770		trace!(
771			target: LOG_TARGET,
772			"Peer {peer_id} ({direction:?}) dropped from {:?}.",
773			self.set_id,
774		);
775
776		match direction {
777			Direction::Inbound => self.num_in -= 1,
778			Direction::Outbound => self.num_out -= 1,
779		}
780
781		true
782	}
783
784	/// Initiate outgoing connections trying to connect all reserved nodes and fill in all outgoing
785	/// slots.
786	fn alloc_slots(&mut self) {
787		// Try connecting to reserved nodes first, ignoring nodes with outstanding events/actions.
788		self.reserved_nodes
789			.iter_mut()
790			.filter_map(|(peer_id, state)| {
791				(!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
792					*state = PeerState::Connected(Direction::Outbound);
793					peer_id
794				})
795			})
796			.cloned()
797			.collect::<Vec<_>>()
798			.into_iter()
799			.for_each(|peer_id| {
800				self.start_connection(peer_id);
801			});
802
803		// Nothing more to do if we're in reserved-only mode or don't have slots available.
804		if self.reserved_only || self.num_out >= self.max_out {
805			return;
806		}
807
808		// Fill available slots.
809		let available_slots = (self.max_out - self.num_out).saturated_into();
810
811		// Ignore reserved nodes (connected above), already connected nodes, and nodes with
812		// outstanding events/actions.
813		let ignored = self
814			.reserved_nodes
815			.keys()
816			.map(From::from)
817			.collect::<HashSet<pezsc_network_types::PeerId>>()
818			.union(
819				&self
820					.nodes
821					.keys()
822					.map(From::from)
823					.collect::<HashSet<pezsc_network_types::PeerId>>(),
824			)
825			.cloned()
826			.collect();
827
828		let candidates = self
829			.peer_store
830			.outgoing_candidates(available_slots, ignored)
831			.into_iter()
832			.filter_map(|peer_id| {
833				(!self.reserved_nodes.contains_key(&peer_id.into())
834					&& !self.nodes.contains_key(&peer_id.into()))
835				.then_some(peer_id)
836				.or_else(|| {
837					error!(
838						target: LOG_TARGET,
839						"`PeerStore` returned a node we asked to ignore: {peer_id}.",
840					);
841					debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
842					None
843				})
844			})
845			.collect::<Vec<_>>();
846
847		if candidates.len() > available_slots {
848			error!(
849				target: LOG_TARGET,
850				"`PeerStore` returned more nodes than there are slots available.",
851			);
852			debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
853		}
854
855		candidates.into_iter().take(available_slots).for_each(|peer_id| {
856			self.num_out += 1;
857			self.nodes.insert(peer_id.into(), Direction::Outbound);
858			self.start_connection(peer_id.into());
859		})
860	}
861}
862
863#[cfg(test)]
864mod tests {
865	use super::*;
866	use crate::{
867		peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
868		ReputationChange,
869	};
870	use libp2p::PeerId;
871	use pezsc_network_common::role::ObservedRole;
872	use pezsc_utils::mpsc::{tracing_unbounded, TryRecvError};
873	use std::collections::HashSet;
874
875	mockall::mock! {
876		#[derive(Debug)]
877		pub PeerStoreHandle {}
878
879		impl PeerStoreProvider for PeerStoreHandle {
880			fn is_banned(&self, peer_id: &pezsc_network_types::PeerId) -> bool;
881			fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
882			fn report_disconnect(&self, peer_id: pezsc_network_types::PeerId);
883			fn set_peer_role(&self, peer_id: &pezsc_network_types::PeerId, role: ObservedRole);
884			fn report_peer(&self, peer_id: pezsc_network_types::PeerId, change: ReputationChange);
885			fn peer_reputation(&self, peer_id: &pezsc_network_types::PeerId) -> i32;
886			fn peer_role(&self, peer_id: &pezsc_network_types::PeerId) -> Option<ObservedRole>;
887			fn outgoing_candidates(&self, count: usize, ignored: HashSet<pezsc_network_types::PeerId>) -> Vec<pezsc_network_types::PeerId>;
888			fn add_known_peer(&self, peer_id: pezsc_network_types::PeerId);
889		}
890	}
891
892	#[test]
893	fn reserved_nodes_are_connected_dropped_and_accepted() {
894		let reserved1 = PeerId::random();
895		let reserved2 = PeerId::random();
896
897		// Add first reserved node via config.
898		let config = ProtoSetConfig {
899			in_peers: 0,
900			out_peers: 0,
901			reserved_nodes: std::iter::once(reserved1).collect(),
902			reserved_only: true,
903		};
904		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
905
906		let mut peer_store = MockPeerStoreHandle::new();
907		peer_store.expect_register_protocol().once().return_const(());
908		peer_store.expect_is_banned().times(4).return_const(false);
909		peer_store.expect_report_disconnect().times(2).return_const(());
910
911		let (_handle, mut controller) =
912			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
913
914		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
915		controller.on_add_reserved_peer(reserved2);
916
917		// Initiate connections (currently, `alloc_slots` is also called internally in
918		// `on_add_reserved_peer` above).
919		controller.alloc_slots();
920
921		let mut messages = Vec::new();
922		while let Some(message) = rx.try_recv().ok() {
923			messages.push(message);
924		}
925		assert_eq!(messages.len(), 2);
926		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
927		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
928
929		// Reserved peers do not occupy slots.
930		assert_eq!(controller.num_out, 0);
931		assert_eq!(controller.num_in, 0);
932
933		// Drop connections to be able to accept reserved nodes.
934		controller.on_peer_dropped(reserved1);
935		controller.on_peer_dropped(reserved2);
936
937		// Incoming connection from `reserved1`.
938		let incoming1 = IncomingIndex(1);
939		controller.on_incoming_connection(reserved1, incoming1);
940		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
941		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
942
943		// Incoming connection from `reserved2`.
944		let incoming2 = IncomingIndex(2);
945		controller.on_incoming_connection(reserved2, incoming2);
946		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
947		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
948
949		// Reserved peers do not occupy slots.
950		assert_eq!(controller.num_out, 0);
951		assert_eq!(controller.num_in, 0);
952	}
953
954	#[test]
955	fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
956		let reserved1 = PeerId::random();
957		let reserved2 = PeerId::random();
958
959		// Add first reserved node via config.
960		let config = ProtoSetConfig {
961			in_peers: 0,
962			out_peers: 0,
963			reserved_nodes: std::iter::once(reserved1).collect(),
964			reserved_only: true,
965		};
966		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
967
968		let mut peer_store = MockPeerStoreHandle::new();
969		peer_store.expect_register_protocol().once().return_const(());
970		peer_store.expect_is_banned().times(6).return_const(true);
971
972		let (_handle, mut controller) =
973			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
974
975		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
976		controller.on_add_reserved_peer(reserved2);
977
978		// Initiate connections.
979		controller.alloc_slots();
980
981		// No slots occupied.
982		assert_eq!(controller.num_out, 0);
983		assert_eq!(controller.num_in, 0);
984
985		// No commands are generated.
986		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
987
988		// Incoming connection from `reserved1`.
989		let incoming1 = IncomingIndex(1);
990		controller.on_incoming_connection(reserved1, incoming1);
991		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
992		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
993
994		// Incoming connection from `reserved2`.
995		let incoming2 = IncomingIndex(2);
996		controller.on_incoming_connection(reserved2, incoming2);
997		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
998		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
999
1000		// No slots occupied.
1001		assert_eq!(controller.num_out, 0);
1002		assert_eq!(controller.num_in, 0);
1003	}
1004
1005	#[test]
1006	fn we_try_to_reconnect_to_dropped_reserved_nodes() {
1007		let reserved1 = PeerId::random();
1008		let reserved2 = PeerId::random();
1009
1010		// Add first reserved node via config.
1011		let config = ProtoSetConfig {
1012			in_peers: 0,
1013			out_peers: 0,
1014			reserved_nodes: std::iter::once(reserved1).collect(),
1015			reserved_only: true,
1016		};
1017		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1018
1019		let mut peer_store = MockPeerStoreHandle::new();
1020		peer_store.expect_register_protocol().once().return_const(());
1021		peer_store.expect_is_banned().times(4).return_const(false);
1022		peer_store.expect_report_disconnect().times(2).return_const(());
1023
1024		let (_handle, mut controller) =
1025			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1026
1027		// Add second reserved node at runtime (this calls `alloc_slots` internally).
1028		controller.on_add_reserved_peer(reserved2);
1029
1030		// Initiate connections (actually redundant, see previous comment).
1031		controller.alloc_slots();
1032
1033		let mut messages = Vec::new();
1034		while let Some(message) = rx.try_recv().ok() {
1035			messages.push(message);
1036		}
1037
1038		assert_eq!(messages.len(), 2);
1039		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1040		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1041
1042		// Drop both reserved nodes.
1043		controller.on_peer_dropped(reserved1);
1044		controller.on_peer_dropped(reserved2);
1045
1046		// Initiate connections.
1047		controller.alloc_slots();
1048
1049		let mut messages = Vec::new();
1050		while let Some(message) = rx.try_recv().ok() {
1051			messages.push(message);
1052		}
1053
1054		assert_eq!(messages.len(), 2);
1055		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1056		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1057
1058		// No slots occupied.
1059		assert_eq!(controller.num_out, 0);
1060		assert_eq!(controller.num_in, 0);
1061	}
1062
1063	#[test]
1064	fn nodes_supplied_by_peer_store_are_connected() {
1065		let peer1 = PeerId::random();
1066		let peer2 = PeerId::random();
1067		let candidates = vec![peer1.into(), peer2.into()];
1068
1069		let config = ProtoSetConfig {
1070			in_peers: 0,
1071			// Less slots than candidates.
1072			out_peers: 2,
1073			reserved_nodes: HashSet::new(),
1074			reserved_only: false,
1075		};
1076		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1077
1078		let mut peer_store = MockPeerStoreHandle::new();
1079		peer_store.expect_register_protocol().once().return_const(());
1080		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1081
1082		let (_handle, mut controller) =
1083			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1084
1085		// Initiate connections.
1086		controller.alloc_slots();
1087
1088		let mut messages = Vec::new();
1089		while let Some(message) = rx.try_recv().ok() {
1090			messages.push(message);
1091		}
1092
1093		// Only first two peers are connected (we only have 2 slots).
1094		assert_eq!(messages.len(), 2);
1095		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1096		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1097
1098		// Outgoing slots occupied.
1099		assert_eq!(controller.num_out, 2);
1100		assert_eq!(controller.num_in, 0);
1101
1102		// No more nodes are connected.
1103		controller.alloc_slots();
1104		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1105
1106		// No more slots occupied.
1107		assert_eq!(controller.num_out, 2);
1108		assert_eq!(controller.num_in, 0);
1109	}
1110
1111	#[test]
1112	fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1113		let reserved1 = PeerId::random();
1114		let reserved2 = PeerId::random();
1115		let regular1 = PeerId::random();
1116		let regular2 = PeerId::random();
1117		let outgoing_candidates = vec![regular1.into(), regular2.into()];
1118		let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1119
1120		let config =
1121			ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1122		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1123
1124		let mut peer_store = MockPeerStoreHandle::new();
1125		peer_store.expect_register_protocol().once().return_const(());
1126		peer_store.expect_is_banned().times(2).return_const(false);
1127		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1128
1129		let (_handle, mut controller) =
1130			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1131
1132		// Initiate connections.
1133		controller.alloc_slots();
1134
1135		let mut messages = Vec::new();
1136		while let Some(message) = rx.try_recv().ok() {
1137			messages.push(message);
1138		}
1139		assert_eq!(messages.len(), 4);
1140		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1141		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1142		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1143		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1144		assert_eq!(controller.num_out, 2);
1145		assert_eq!(controller.num_in, 0);
1146	}
1147
1148	#[test]
1149	fn if_slots_are_freed_we_try_to_allocate_them_again() {
1150		let peer1 = PeerId::random();
1151		let peer2 = PeerId::random();
1152		let peer3 = PeerId::random();
1153		let candidates1 = vec![peer1.into(), peer2.into()];
1154		let candidates2 = vec![peer3.into()];
1155
1156		let config = ProtoSetConfig {
1157			in_peers: 0,
1158			// Less slots than candidates.
1159			out_peers: 2,
1160			reserved_nodes: HashSet::new(),
1161			reserved_only: false,
1162		};
1163		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1164
1165		let mut peer_store = MockPeerStoreHandle::new();
1166		peer_store.expect_register_protocol().once().return_const(());
1167		peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1168		peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1169		peer_store.expect_report_disconnect().times(2).return_const(());
1170
1171		let (_handle, mut controller) =
1172			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1173
1174		// Initiate connections.
1175		controller.alloc_slots();
1176
1177		let mut messages = Vec::new();
1178		while let Some(message) = rx.try_recv().ok() {
1179			messages.push(message);
1180		}
1181
1182		// Only first two peers are connected (we only have 2 slots).
1183		assert_eq!(messages.len(), 2);
1184		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1185		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1186
1187		// Outgoing slots occupied.
1188		assert_eq!(controller.num_out, 2);
1189		assert_eq!(controller.num_in, 0);
1190
1191		// No more nodes are connected.
1192		controller.alloc_slots();
1193		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1194
1195		// No more slots occupied.
1196		assert_eq!(controller.num_out, 2);
1197		assert_eq!(controller.num_in, 0);
1198
1199		// Drop peers.
1200		controller.on_peer_dropped(peer1);
1201		controller.on_peer_dropped(peer2);
1202
1203		// Slots are freed.
1204		assert_eq!(controller.num_out, 0);
1205		assert_eq!(controller.num_in, 0);
1206
1207		// Initiate connections.
1208		controller.alloc_slots();
1209
1210		let mut messages = Vec::new();
1211		while let Some(message) = rx.try_recv().ok() {
1212			messages.push(message);
1213		}
1214
1215		// Peers are connected.
1216		assert_eq!(messages.len(), 1);
1217		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1218
1219		// Outgoing slots occupied.
1220		assert_eq!(controller.num_out, 1);
1221		assert_eq!(controller.num_in, 0);
1222	}
1223
1224	#[test]
1225	fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1226		let config = ProtoSetConfig {
1227			in_peers: 0,
1228			// Make sure we have slots available.
1229			out_peers: 2,
1230			reserved_nodes: HashSet::new(),
1231			reserved_only: true,
1232		};
1233		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1234
1235		let mut peer_store = MockPeerStoreHandle::new();
1236		peer_store.expect_register_protocol().once().return_const(());
1237
1238		let (_handle, mut controller) =
1239			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1240
1241		// Initiate connections.
1242		controller.alloc_slots();
1243
1244		// No nodes are connected.
1245		assert_eq!(controller.num_out, 0);
1246		assert_eq!(controller.num_in, 0);
1247		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1248	}
1249
1250	#[test]
1251	fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1252		let config = ProtoSetConfig {
1253			// Make sure we have slots available.
1254			in_peers: 2,
1255			out_peers: 0,
1256			reserved_nodes: HashSet::new(),
1257			reserved_only: true,
1258		};
1259		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1260
1261		let mut peer_store = MockPeerStoreHandle::new();
1262		peer_store.expect_register_protocol().once().return_const(());
1263
1264		let (_handle, mut controller) =
1265			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1266
1267		let peer = PeerId::random();
1268		let incoming_index = IncomingIndex(1);
1269		controller.on_incoming_connection(peer, incoming_index);
1270
1271		let mut messages = Vec::new();
1272		while let Some(message) = rx.try_recv().ok() {
1273			messages.push(message);
1274		}
1275
1276		// Peer is rejected.
1277		assert_eq!(messages.len(), 1);
1278		assert!(messages.contains(&Message::Reject(incoming_index)));
1279		assert_eq!(controller.num_out, 0);
1280		assert_eq!(controller.num_in, 0);
1281	}
1282
1283	#[test]
1284	fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1285		let peer1 = PeerId::random();
1286		let peer2 = PeerId::random();
1287		let candidates = vec![peer1.into(), peer2.into()];
1288
1289		let config = ProtoSetConfig {
1290			in_peers: 0,
1291			// Make sure we have slots available.
1292			out_peers: 10,
1293			reserved_nodes: HashSet::new(),
1294			reserved_only: true,
1295		};
1296		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1297
1298		let mut peer_store = MockPeerStoreHandle::new();
1299		peer_store.expect_register_protocol().once().return_const(());
1300		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1301
1302		let (_handle, mut controller) =
1303			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1304
1305		// Initiate connections.
1306		controller.alloc_slots();
1307
1308		// No nodes are connected.
1309		assert_eq!(controller.num_out, 0);
1310		assert_eq!(controller.num_in, 0);
1311		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1312
1313		// Disable reserved-only mode (this also connects to peers).
1314		controller.on_set_reserved_only(false);
1315
1316		let mut messages = Vec::new();
1317		while let Some(message) = rx.try_recv().ok() {
1318			messages.push(message);
1319		}
1320
1321		assert_eq!(messages.len(), 2);
1322		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1323		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1324		assert_eq!(controller.num_out, 2);
1325		assert_eq!(controller.num_in, 0);
1326	}
1327
1328	#[test]
1329	fn enabling_reserved_only_mode_disconnects_regular_peers() {
1330		let reserved1 = PeerId::random();
1331		let reserved2 = PeerId::random();
1332		let regular1 = PeerId::random();
1333		let regular2 = PeerId::random();
1334		let outgoing_candidates = vec![regular1.into()];
1335
1336		let config = ProtoSetConfig {
1337			in_peers: 10,
1338			out_peers: 10,
1339			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1340			reserved_only: false,
1341		};
1342		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1343
1344		let mut peer_store = MockPeerStoreHandle::new();
1345		peer_store.expect_register_protocol().once().return_const(());
1346		peer_store.expect_is_banned().times(3).return_const(false);
1347		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1348
1349		let (_handle, mut controller) =
1350			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1351		assert_eq!(controller.num_out, 0);
1352		assert_eq!(controller.num_in, 0);
1353
1354		// Connect `regular1` as outbound.
1355		controller.alloc_slots();
1356
1357		let mut messages = Vec::new();
1358		while let Some(message) = rx.try_recv().ok() {
1359			messages.push(message);
1360		}
1361		assert_eq!(messages.len(), 3);
1362		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1363		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1364		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1365		assert_eq!(controller.num_out, 1);
1366		assert_eq!(controller.num_in, 0);
1367
1368		// Connect `regular2` as inbound.
1369		let incoming_index = IncomingIndex(1);
1370		controller.on_incoming_connection(regular2, incoming_index);
1371		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1372		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1373		assert_eq!(controller.num_out, 1);
1374		assert_eq!(controller.num_in, 1);
1375
1376		// Switch to reserved-only mode.
1377		controller.on_set_reserved_only(true);
1378
1379		let mut messages = Vec::new();
1380		while let Some(message) = rx.try_recv().ok() {
1381			messages.push(message);
1382		}
1383		assert_eq!(messages.len(), 2);
1384		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1385		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1386		assert_eq!(controller.nodes.len(), 0);
1387		assert_eq!(controller.num_out, 0);
1388		assert_eq!(controller.num_in, 0);
1389	}
1390
1391	#[test]
1392	fn removed_disconnected_reserved_node_is_forgotten() {
1393		let reserved1 = PeerId::random();
1394		let reserved2 = PeerId::random();
1395
1396		let config = ProtoSetConfig {
1397			in_peers: 10,
1398			out_peers: 10,
1399			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1400			reserved_only: false,
1401		};
1402		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1403
1404		let mut peer_store = MockPeerStoreHandle::new();
1405		peer_store.expect_register_protocol().once().return_const(());
1406
1407		let (_handle, mut controller) =
1408			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1409		assert_eq!(controller.reserved_nodes.len(), 2);
1410		assert_eq!(controller.nodes.len(), 0);
1411		assert_eq!(controller.num_out, 0);
1412		assert_eq!(controller.num_in, 0);
1413
1414		controller.on_remove_reserved_peer(reserved1);
1415		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1416		assert_eq!(controller.reserved_nodes.len(), 1);
1417		assert!(!controller.reserved_nodes.contains_key(&reserved1));
1418		assert_eq!(controller.nodes.len(), 0);
1419		assert_eq!(controller.num_out, 0);
1420		assert_eq!(controller.num_in, 0);
1421	}
1422
1423	#[test]
1424	fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1425		let reserved1 = PeerId::random();
1426		let reserved2 = PeerId::random();
1427
1428		let config = ProtoSetConfig {
1429			in_peers: 10,
1430			out_peers: 10,
1431			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1432			reserved_only: true,
1433		};
1434		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1435
1436		let mut peer_store = MockPeerStoreHandle::new();
1437		peer_store.expect_register_protocol().once().return_const(());
1438		peer_store.expect_is_banned().times(2).return_const(false);
1439
1440		let (_handle, mut controller) =
1441			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1442
1443		// Initiate connections.
1444		controller.alloc_slots();
1445		let mut messages = Vec::new();
1446		while let Some(message) = rx.try_recv().ok() {
1447			messages.push(message);
1448		}
1449		assert_eq!(messages.len(), 2);
1450		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1451		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1452		assert_eq!(controller.reserved_nodes.len(), 2);
1453		assert!(controller.reserved_nodes.contains_key(&reserved1));
1454		assert!(controller.reserved_nodes.contains_key(&reserved2));
1455		assert!(controller.nodes.is_empty());
1456
1457		// Remove reserved node
1458		controller.on_remove_reserved_peer(reserved1);
1459		assert_eq!(
1460			rx.try_recv().unwrap(),
1461			Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1462		);
1463		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1464		assert_eq!(controller.reserved_nodes.len(), 1);
1465		assert!(controller.reserved_nodes.contains_key(&reserved2));
1466		assert!(controller.nodes.is_empty());
1467	}
1468
1469	#[test]
1470	fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1471		let peer1 = PeerId::random();
1472		let peer2 = PeerId::random();
1473
1474		let config = ProtoSetConfig {
1475			in_peers: 10,
1476			out_peers: 10,
1477			reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1478			reserved_only: false,
1479		};
1480		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1481
1482		let mut peer_store = MockPeerStoreHandle::new();
1483		peer_store.expect_register_protocol().once().return_const(());
1484		peer_store.expect_is_banned().times(2).return_const(false);
1485		peer_store
1486			.expect_outgoing_candidates()
1487			.once()
1488			.return_const(Vec::<pezsc_network_types::PeerId>::new());
1489
1490		let (_handle, mut controller) =
1491			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1492
1493		// Connect `peer1` as inbound, `peer2` as outbound.
1494		controller.on_incoming_connection(peer1, IncomingIndex(1));
1495		controller.alloc_slots();
1496		let mut messages = Vec::new();
1497		while let Some(message) = rx.try_recv().ok() {
1498			messages.push(message);
1499		}
1500		assert_eq!(messages.len(), 2);
1501		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1502		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1503		assert_eq!(controller.num_out, 0);
1504		assert_eq!(controller.num_in, 0);
1505
1506		// Remove reserved nodes (and make them regular)
1507		controller.on_remove_reserved_peer(peer1);
1508		controller.on_remove_reserved_peer(peer2);
1509		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1510		assert_eq!(controller.nodes.len(), 2);
1511		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1512		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1513		assert_eq!(controller.num_out, 1);
1514		assert_eq!(controller.num_in, 1);
1515	}
1516
1517	#[test]
1518	fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1519		let peer1 = PeerId::random();
1520		let peer2 = PeerId::random();
1521		let outgoing_candidates = vec![peer1.into()];
1522
1523		let config = ProtoSetConfig {
1524			in_peers: 10,
1525			out_peers: 10,
1526			reserved_nodes: HashSet::new(),
1527			reserved_only: false,
1528		};
1529		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1530
1531		let mut peer_store = MockPeerStoreHandle::new();
1532		peer_store.expect_register_protocol().once().return_const(());
1533		peer_store.expect_is_banned().once().return_const(false);
1534		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1535
1536		let (_handle, mut controller) =
1537			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1538
1539		// Connect `peer1` as outbound & `peer2` as inbound.
1540		controller.alloc_slots();
1541		controller.on_incoming_connection(peer2, IncomingIndex(1));
1542		let mut messages = Vec::new();
1543		while let Some(message) = rx.try_recv().ok() {
1544			messages.push(message);
1545		}
1546		assert_eq!(messages.len(), 2);
1547		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1548		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1549		assert_eq!(controller.num_in, 1);
1550		assert_eq!(controller.num_out, 1);
1551
1552		controller.on_add_reserved_peer(peer1);
1553		controller.on_add_reserved_peer(peer2);
1554		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1555		assert_eq!(controller.num_in, 0);
1556		assert_eq!(controller.num_out, 0);
1557	}
1558
1559	#[test]
1560	fn disconnecting_regular_peers_work() {
1561		let peer1 = PeerId::random();
1562		let peer2 = PeerId::random();
1563		let outgoing_candidates = vec![peer1.into()];
1564
1565		let config = ProtoSetConfig {
1566			in_peers: 10,
1567			out_peers: 10,
1568			reserved_nodes: HashSet::new(),
1569			reserved_only: false,
1570		};
1571		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1572
1573		let mut peer_store = MockPeerStoreHandle::new();
1574		peer_store.expect_register_protocol().once().return_const(());
1575		peer_store.expect_is_banned().once().return_const(false);
1576		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1577
1578		let (_handle, mut controller) =
1579			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1580
1581		// Connect `peer1` as outbound & `peer2` as inbound.
1582		controller.alloc_slots();
1583		controller.on_incoming_connection(peer2, IncomingIndex(1));
1584		let mut messages = Vec::new();
1585		while let Some(message) = rx.try_recv().ok() {
1586			messages.push(message);
1587		}
1588		assert_eq!(messages.len(), 2);
1589		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1590		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1591		assert_eq!(controller.nodes.len(), 2);
1592		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1593		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1594		assert_eq!(controller.num_in, 1);
1595		assert_eq!(controller.num_out, 1);
1596
1597		controller.on_disconnect_peer(peer1);
1598		assert_eq!(
1599			rx.try_recv().unwrap(),
1600			Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1601		);
1602		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1603		assert_eq!(controller.nodes.len(), 1);
1604		assert!(!controller.nodes.contains_key(&peer1));
1605		assert_eq!(controller.num_in, 1);
1606		assert_eq!(controller.num_out, 0);
1607
1608		controller.on_disconnect_peer(peer2);
1609		assert_eq!(
1610			rx.try_recv().unwrap(),
1611			Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1612		);
1613		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1614		assert_eq!(controller.nodes.len(), 0);
1615		assert_eq!(controller.num_in, 0);
1616		assert_eq!(controller.num_out, 0);
1617	}
1618
1619	#[test]
1620	fn disconnecting_reserved_peers_is_a_noop() {
1621		let reserved1 = PeerId::random();
1622		let reserved2 = PeerId::random();
1623
1624		let config = ProtoSetConfig {
1625			in_peers: 10,
1626			out_peers: 10,
1627			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1628			reserved_only: false,
1629		};
1630		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1631
1632		let mut peer_store = MockPeerStoreHandle::new();
1633		peer_store.expect_register_protocol().once().return_const(());
1634		peer_store.expect_is_banned().times(2).return_const(false);
1635		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1636
1637		let (_handle, mut controller) =
1638			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1639
1640		// Connect `reserved1` as inbound & `reserved2` as outbound.
1641		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1642		controller.alloc_slots();
1643		let mut messages = Vec::new();
1644		while let Some(message) = rx.try_recv().ok() {
1645			messages.push(message);
1646		}
1647		assert_eq!(messages.len(), 2);
1648		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1649		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1650		assert!(matches!(
1651			controller.reserved_nodes.get(&reserved1),
1652			Some(PeerState::Connected(Direction::Inbound))
1653		));
1654		assert!(matches!(
1655			controller.reserved_nodes.get(&reserved2),
1656			Some(PeerState::Connected(Direction::Outbound))
1657		));
1658
1659		controller.on_disconnect_peer(reserved1);
1660		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1661		assert!(matches!(
1662			controller.reserved_nodes.get(&reserved1),
1663			Some(PeerState::Connected(Direction::Inbound))
1664		));
1665
1666		controller.on_disconnect_peer(reserved2);
1667		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1668		assert!(matches!(
1669			controller.reserved_nodes.get(&reserved2),
1670			Some(PeerState::Connected(Direction::Outbound))
1671		));
1672	}
1673
1674	#[test]
1675	fn dropping_regular_peers_work() {
1676		let peer1 = PeerId::random();
1677		let peer2 = PeerId::random();
1678		let outgoing_candidates = vec![peer1.into()];
1679
1680		let config = ProtoSetConfig {
1681			in_peers: 10,
1682			out_peers: 10,
1683			reserved_nodes: HashSet::new(),
1684			reserved_only: false,
1685		};
1686		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1687
1688		let mut peer_store = MockPeerStoreHandle::new();
1689		peer_store.expect_register_protocol().once().return_const(());
1690		peer_store.expect_is_banned().once().return_const(false);
1691		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1692		peer_store.expect_report_disconnect().times(2).return_const(());
1693
1694		let (_handle, mut controller) =
1695			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1696
1697		// Connect `peer1` as outbound & `peer2` as inbound.
1698		controller.alloc_slots();
1699		controller.on_incoming_connection(peer2, IncomingIndex(1));
1700		let mut messages = Vec::new();
1701		while let Some(message) = rx.try_recv().ok() {
1702			messages.push(message);
1703		}
1704		assert_eq!(messages.len(), 2);
1705		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1706		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1707		assert_eq!(controller.nodes.len(), 2);
1708		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1709		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1710		assert_eq!(controller.num_in, 1);
1711		assert_eq!(controller.num_out, 1);
1712
1713		controller.on_peer_dropped(peer1);
1714		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1715		assert_eq!(controller.nodes.len(), 1);
1716		assert!(!controller.nodes.contains_key(&peer1));
1717		assert_eq!(controller.num_in, 1);
1718		assert_eq!(controller.num_out, 0);
1719
1720		controller.on_peer_dropped(peer2);
1721		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1722		assert_eq!(controller.nodes.len(), 0);
1723		assert_eq!(controller.num_in, 0);
1724		assert_eq!(controller.num_out, 0);
1725	}
1726
1727	#[test]
1728	fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1729		let reserved1 = PeerId::random();
1730		let reserved2 = PeerId::random();
1731
1732		let config = ProtoSetConfig {
1733			in_peers: 10,
1734			out_peers: 10,
1735			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1736			reserved_only: false,
1737		};
1738		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1739
1740		let mut peer_store = MockPeerStoreHandle::new();
1741		peer_store.expect_register_protocol().once().return_const(());
1742		peer_store.expect_is_banned().times(2).return_const(false);
1743		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1744
1745		let (_handle, mut controller) =
1746			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1747
1748		// Connect `reserved1` as inbound & `reserved2` as outbound.
1749		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1750		controller.alloc_slots();
1751		let mut messages = Vec::new();
1752		while let Some(message) = rx.try_recv().ok() {
1753			messages.push(message);
1754		}
1755		assert_eq!(messages.len(), 2);
1756		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1757		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1758		assert!(matches!(
1759			controller.reserved_nodes.get(&reserved1),
1760			Some(PeerState::Connected(Direction::Inbound))
1761		));
1762		assert!(matches!(
1763			controller.reserved_nodes.get(&reserved2),
1764			Some(PeerState::Connected(Direction::Outbound))
1765		));
1766
1767		// Incoming request for `reserved1`.
1768		controller.on_incoming_connection(reserved1, IncomingIndex(2));
1769		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1770		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1771		assert!(matches!(
1772			controller.reserved_nodes.get(&reserved1),
1773			Some(PeerState::Connected(Direction::Inbound))
1774		));
1775
1776		// Incoming request for `reserved2`.
1777		controller.on_incoming_connection(reserved2, IncomingIndex(3));
1778		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1779		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1780		assert!(matches!(
1781			controller.reserved_nodes.get(&reserved2),
1782			Some(PeerState::Connected(Direction::Inbound))
1783		));
1784	}
1785
1786	#[test]
1787	fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1788		let regular1 = PeerId::random();
1789		let regular2 = PeerId::random();
1790		let outgoing_candidates = vec![regular1.into()];
1791
1792		let config = ProtoSetConfig {
1793			in_peers: 10,
1794			out_peers: 10,
1795			reserved_nodes: HashSet::new(),
1796			reserved_only: false,
1797		};
1798		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1799
1800		let mut peer_store = MockPeerStoreHandle::new();
1801		peer_store.expect_register_protocol().once().return_const(());
1802		peer_store.expect_is_banned().times(3).return_const(false);
1803		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1804
1805		let (_handle, mut controller) =
1806			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1807		assert_eq!(controller.num_out, 0);
1808		assert_eq!(controller.num_in, 0);
1809
1810		// Connect `regular1` as outbound.
1811		controller.alloc_slots();
1812		assert_eq!(
1813			rx.try_recv().ok().unwrap(),
1814			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1815		);
1816		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1817		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1818
1819		// Connect `regular2` as inbound.
1820		controller.on_incoming_connection(regular2, IncomingIndex(0));
1821		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1822		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1823		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1824
1825		// Incoming request for `regular1`.
1826		controller.on_incoming_connection(regular1, IncomingIndex(1));
1827		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1828		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1829		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Inbound,));
1830
1831		// Incoming request for `regular2`.
1832		controller.on_incoming_connection(regular2, IncomingIndex(2));
1833		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1834		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1835		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1836	}
1837
1838	#[test]
1839	fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1840		let regular1 = PeerId::random();
1841		let regular2 = PeerId::random();
1842		let outgoing_candidates = vec![regular1.into()];
1843
1844		let config = ProtoSetConfig {
1845			in_peers: 10,
1846			out_peers: 10,
1847			reserved_nodes: HashSet::new(),
1848			reserved_only: false,
1849		};
1850		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1851
1852		let mut peer_store = MockPeerStoreHandle::new();
1853		peer_store.expect_register_protocol().once().return_const(());
1854		peer_store.expect_is_banned().once().return_const(false);
1855		peer_store.expect_is_banned().times(2).return_const(true);
1856		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1857
1858		let (_handle, mut controller) =
1859			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1860		assert_eq!(controller.num_out, 0);
1861		assert_eq!(controller.num_in, 0);
1862
1863		// Connect `regular1` as outbound.
1864		controller.alloc_slots();
1865		assert_eq!(
1866			rx.try_recv().ok().unwrap(),
1867			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1868		);
1869		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1870		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1871
1872		// Connect `regular2` as inbound.
1873		controller.on_incoming_connection(regular2, IncomingIndex(0));
1874		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1875		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1876		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1877
1878		// Incoming request for `regular1`.
1879		controller.on_incoming_connection(regular1, IncomingIndex(1));
1880		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1881		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1882		assert!(!controller.nodes.contains_key(&regular1));
1883
1884		// Incoming request for `regular2`.
1885		controller.on_incoming_connection(regular2, IncomingIndex(2));
1886		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1887		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1888		assert!(!controller.nodes.contains_key(&regular2));
1889	}
1890
1891	#[test]
1892	fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1893		let regular1 = PeerId::random();
1894		let regular2 = PeerId::random();
1895		let outgoing_candidates = vec![regular1.into()];
1896
1897		let config = ProtoSetConfig {
1898			in_peers: 1,
1899			out_peers: 1,
1900			reserved_nodes: HashSet::new(),
1901			reserved_only: false,
1902		};
1903		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1904
1905		let mut peer_store = MockPeerStoreHandle::new();
1906		peer_store.expect_register_protocol().once().return_const(());
1907		peer_store.expect_is_banned().once().return_const(false);
1908		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1909
1910		let (_handle, mut controller) =
1911			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1912		assert_eq!(controller.num_out, 0);
1913		assert_eq!(controller.num_in, 0);
1914
1915		// Connect `regular1` as outbound.
1916		controller.alloc_slots();
1917		assert_eq!(
1918			rx.try_recv().ok().unwrap(),
1919			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1920		);
1921		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1922		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1923
1924		// Connect `regular2` as inbound.
1925		controller.on_incoming_connection(regular2, IncomingIndex(0));
1926		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1927		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1928		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1929
1930		controller.max_in = 0;
1931
1932		// Incoming request for `regular1`.
1933		controller.on_incoming_connection(regular1, IncomingIndex(1));
1934		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1935		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1936		assert!(!controller.nodes.contains_key(&regular1));
1937
1938		// Incoming request for `regular2`.
1939		controller.on_incoming_connection(regular2, IncomingIndex(2));
1940		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1941		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1942		assert!(!controller.nodes.contains_key(&regular2));
1943	}
1944
1945	#[test]
1946	fn incoming_peers_that_exceed_slots_are_rejected() {
1947		let peer1 = PeerId::random();
1948		let peer2 = PeerId::random();
1949
1950		let config = ProtoSetConfig {
1951			in_peers: 1,
1952			out_peers: 10,
1953			reserved_nodes: HashSet::new(),
1954			reserved_only: false,
1955		};
1956		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1957
1958		let mut peer_store = MockPeerStoreHandle::new();
1959		peer_store.expect_register_protocol().once().return_const(());
1960		peer_store.expect_is_banned().once().return_const(false);
1961
1962		let (_handle, mut controller) =
1963			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1964
1965		// Connect `peer1` as inbound.
1966		controller.on_incoming_connection(peer1, IncomingIndex(1));
1967		assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1968		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1969
1970		// Incoming requests for `peer2`.
1971		controller.on_incoming_connection(peer2, IncomingIndex(2));
1972		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1973		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1974	}
1975
1976	#[test]
1977	fn banned_regular_incoming_node_is_rejected() {
1978		let peer1 = PeerId::random();
1979
1980		let config = ProtoSetConfig {
1981			in_peers: 10,
1982			out_peers: 10,
1983			reserved_nodes: HashSet::new(),
1984			reserved_only: false,
1985		};
1986		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1987
1988		let mut peer_store = MockPeerStoreHandle::new();
1989		peer_store.expect_register_protocol().once().return_const(());
1990		peer_store.expect_is_banned().once().return_const(true);
1991
1992		let (_handle, mut controller) =
1993			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1994
1995		// Incoming request.
1996		controller.on_incoming_connection(peer1, IncomingIndex(1));
1997		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1998		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1999	}
2000
2001	#[test]
2002	fn banned_reserved_incoming_node_is_rejected() {
2003		let reserved1 = PeerId::random();
2004
2005		let config = ProtoSetConfig {
2006			in_peers: 10,
2007			out_peers: 10,
2008			reserved_nodes: std::iter::once(reserved1).collect(),
2009			reserved_only: false,
2010		};
2011		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2012
2013		let mut peer_store = MockPeerStoreHandle::new();
2014		peer_store.expect_register_protocol().once().return_const(());
2015		peer_store.expect_is_banned().once().return_const(true);
2016
2017		let (_handle, mut controller) =
2018			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2019		assert!(controller.reserved_nodes.contains_key(&reserved1));
2020
2021		// Incoming request.
2022		controller.on_incoming_connection(reserved1, IncomingIndex(1));
2023		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2024		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2025	}
2026
2027	#[test]
2028	fn we_dont_connect_to_banned_reserved_node() {
2029		let reserved1 = PeerId::random();
2030
2031		let config = ProtoSetConfig {
2032			in_peers: 10,
2033			out_peers: 10,
2034			reserved_nodes: std::iter::once(reserved1).collect(),
2035			reserved_only: false,
2036		};
2037		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2038
2039		let mut peer_store = MockPeerStoreHandle::new();
2040		peer_store.expect_register_protocol().once().return_const(());
2041		peer_store.expect_is_banned().once().return_const(true);
2042		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2043
2044		let (_handle, mut controller) =
2045			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2046		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2047
2048		// Initiate connections
2049		controller.alloc_slots();
2050		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2051		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2052	}
2053}