Skip to main content

sc_network/
protocol_controller.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Protocol Controller. Generic implementation of peer management for protocols.
20//! Responsible for accepting/rejecting incoming connections and initiating outgoing connections,
21//! respecting the inbound and outbound peer slot counts. Communicates with `PeerStore` to get and
22//! update peer reputation values and sends commands to `Notifications`.
23//!
24//! Due to asynchronous nature of communication between `ProtocolController` and `Notifications`,
25//! `ProtocolController` has an imperfect view of the states of the peers. To reduce this
26//! desynchronization, the following measures are taken:
27//!
28//! 1. Network peer events from `Notifications` are prioritized over actions from external API and
29//!    internal actions by `ProtocolController` (like slot allocation).
30//! 2. `Notifications` ignores all commands from `ProtocolController` after sending "incoming"
31//!    request until receiving the answer to this "incoming" request.
32//! 3. After sending a "connect" message, `ProtocolController` switches the state of the peer from
33//!    `Outbound` to `Inbound` if it receives an "incoming" request from `Notifications` for this
34//!    peer.
35//!
36//! These measures do not eliminate confusing commands from `ProtocolController` completely,
37//! so `Notifications` must correctly handle seemingly inconsistent commands, like a "connect"
38//! command for the peer it thinks is already connected, and a "drop" command for a peer that
39//! was previously dropped.
40//!
41//! Even though this does not guarantee that `ProtocolController` and `Notifications` have the same
42//! view of the peers' states at any given moment, the eventual consistency is maintained.
43
44use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
45
46use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
47use libp2p::PeerId;
48use log::{debug, error, trace, warn};
49use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
50use sp_arithmetic::traits::SaturatedConversion;
51use std::{
52	collections::{HashMap, HashSet},
53	sync::Arc,
54	time::{Duration, Instant},
55};
56use wasm_timer::Delay;
57
58/// Log target for this file.
59pub const LOG_TARGET: &str = "peerset";
60
61/// `Notifications` protocol index. For historical reasons it's called `SetId`, because it
62/// used to refer to a set of peers in a peerset for this protocol.
63///
64/// Can be constructed using the `From<usize>` trait implementation based on the index of the
65/// protocol in `Notifications`.
66#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
67pub struct SetId(usize);
68
69impl SetId {
70	/// Const conversion function for initialization of hardcoded peerset indices.
71	pub const fn from(id: usize) -> Self {
72		Self(id)
73	}
74}
75
76impl From<usize> for SetId {
77	fn from(id: usize) -> Self {
78		Self(id)
79	}
80}
81
82impl From<SetId> for usize {
83	fn from(id: SetId) -> Self {
84		id.0
85	}
86}
87
88/// Configuration for a set of nodes for a specific protocol.
89#[derive(Debug)]
90pub struct ProtoSetConfig {
91	/// Maximum number of incoming links to peers.
92	pub in_peers: u32,
93
94	/// Maximum number of outgoing links to peers.
95	pub out_peers: u32,
96
97	/// Lists of nodes we should always be connected to.
98	///
99	/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
100	/// >			otherwise it will not be able to connect to them.
101	pub reserved_nodes: HashSet<PeerId>,
102
103	/// If true, we only accept nodes in [`ProtoSetConfig::reserved_nodes`].
104	pub reserved_only: bool,
105}
106
107/// Message that is sent by [`ProtocolController`] to `Notifications`.
108#[derive(Debug, PartialEq)]
109pub enum Message {
110	/// Request to open a connection to the given peer. From the point of view of the
111	/// `ProtocolController`, we are immediately connected.
112	Connect {
113		/// Set id to connect on.
114		set_id: SetId,
115		/// Peer to connect to.
116		peer_id: PeerId,
117	},
118
119	/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
120	Drop {
121		/// Set id to disconnect on.
122		set_id: SetId,
123		/// Peer to disconnect from.
124		peer_id: PeerId,
125	},
126
127	/// Equivalent to `Connect` for the peer corresponding to this incoming index.
128	Accept(IncomingIndex),
129
130	/// Equivalent to `Drop` for the peer corresponding to this incoming index.
131	Reject(IncomingIndex),
132}
133
134/// Opaque identifier for an incoming connection. Allocated by the network.
135#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub struct IncomingIndex(pub u64);
137
138impl From<u64> for IncomingIndex {
139	fn from(val: u64) -> Self {
140		Self(val)
141	}
142}
143
144/// External API actions.
145#[derive(Debug)]
146enum Action {
147	/// Add a reserved peer or mark already connected peer as reserved.
148	AddReservedPeer(PeerId),
149	/// Remove a reserved peer.
150	RemoveReservedPeer(PeerId),
151	/// Update reserved peers to match the provided set.
152	SetReservedPeers(HashSet<PeerId>),
153	/// Set/unset reserved-only mode.
154	SetReservedOnly(bool),
155	/// Disconnect a peer.
156	DisconnectPeer(PeerId),
157	/// Get the list of reserved peers.
158	GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
159}
160
161/// Network events from `Notifications`.
162#[derive(Debug)]
163enum Event {
164	/// Incoming connection from the peer.
165	IncomingConnection(PeerId, IncomingIndex),
166	/// Connection with the peer dropped.
167	Dropped(PeerId),
168}
169
170/// Shared handle to [`ProtocolController`]. Distributed around the code outside of the
171/// protocol implementation.
172#[derive(Debug, Clone)]
173pub struct ProtocolHandle {
174	/// Actions from outer API.
175	actions_tx: TracingUnboundedSender<Action>,
176	/// Connection events from `Notifications`. We prioritize them over actions.
177	events_tx: TracingUnboundedSender<Event>,
178}
179
180impl ProtocolHandle {
181	/// Adds a new reserved peer. [`ProtocolController`] will make an effort
182	/// to always remain connected to this peer.
183	///
184	/// Has no effect if the node was already a reserved peer.
185	///
186	/// > **Note**: Keep in mind that the networking has to know an address for this node,
187	/// > otherwise it will not be able to connect to it.
188	pub fn add_reserved_peer(&self, peer_id: PeerId) {
189		let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
190	}
191
192	/// Demotes reserved peer to non-reserved. Does not disconnect the peer.
193	///
194	/// Has no effect if the node was not a reserved peer.
195	pub fn remove_reserved_peer(&self, peer_id: PeerId) {
196		let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
197	}
198
199	/// Set reserved peers to the new set.
200	pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
201		let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
202	}
203
204	/// Sets whether or not [`ProtocolController`] only has connections with nodes marked
205	/// as reserved for the given set.
206	pub fn set_reserved_only(&self, reserved: bool) {
207		let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
208	}
209
210	/// Disconnect peer. You should remove the `PeerId` from the `PeerStore` first
211	/// to not connect to the peer again during the next slot allocation.
212	pub fn disconnect_peer(&self, peer_id: PeerId) {
213		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
214	}
215
216	/// Get the list of reserved peers.
217	pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
218		let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
219	}
220
221	/// Notify about incoming connection. [`ProtocolController`] will either accept or reject it.
222	pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
223		let _ = self
224			.events_tx
225			.unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
226	}
227
228	/// Notify that connection was dropped (either refused or disconnected).
229	pub fn dropped(&self, peer_id: PeerId) {
230		let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
231	}
232}
233
234impl ProtocolHandleT for ProtocolHandle {
235	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId) {
236		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
237	}
238}
239
240/// Direction of a connection
241#[derive(Clone, Copy, Debug)]
242enum Direction {
243	Inbound,
244	Outbound,
245}
246
247/// Status of a connection with a peer.
248#[derive(Clone, Debug)]
249enum PeerState {
250	/// We are connected to the peer.
251	Connected(Direction),
252	/// We are not connected.
253	NotConnected,
254}
255
256impl PeerState {
257	/// Returns true if we are connected with the node.
258	fn is_connected(&self) -> bool {
259		matches!(self, PeerState::Connected(_))
260	}
261}
262
263impl Default for PeerState {
264	fn default() -> PeerState {
265		PeerState::NotConnected
266	}
267}
268
269/// Worker side of [`ProtocolHandle`] responsible for all the logic.
270#[derive(Debug)]
271pub struct ProtocolController {
272	/// Set id to use when sending connect/drop requests to `Notifications`.
273	// Will likely be replaced by `ProtocolName` in the future.
274	set_id: SetId,
275	/// Receiver for outer API messages from [`ProtocolHandle`].
276	actions_rx: TracingUnboundedReceiver<Action>,
277	/// Receiver for connection events from `Notifications` sent via [`ProtocolHandle`].
278	events_rx: TracingUnboundedReceiver<Event>,
279	/// Number of occupied slots for incoming connections (not counting reserved nodes).
280	num_in: u32,
281	/// Number of occupied slots for outgoing connections (not counting reserved nodes).
282	num_out: u32,
283	/// Maximum number of slots for incoming connections (not counting reserved nodes).
284	max_in: u32,
285	/// Maximum number of slots for outgoing connections (not counting reserved nodes).
286	max_out: u32,
287	/// Connected regular nodes.
288	nodes: HashMap<PeerId, Direction>,
289	/// Reserved nodes. Should be always connected and do not occupy peer slots.
290	reserved_nodes: HashMap<PeerId, PeerState>,
291	/// Connect only to reserved nodes.
292	reserved_only: bool,
293	/// Next time to allocate slots. This is done once per second.
294	next_periodic_alloc_slots: Instant,
295	/// Outgoing channel for messages to `Notifications`.
296	to_notifications: TracingUnboundedSender<Message>,
297	/// `PeerStore` handle for checking peer reputation values and getting connection candidates
298	/// with highest reputation.
299	peer_store: Arc<dyn PeerStoreProvider>,
300}
301
302impl ProtocolController {
303	/// Construct new [`ProtocolController`].
304	pub fn new(
305		set_id: SetId,
306		config: ProtoSetConfig,
307		to_notifications: TracingUnboundedSender<Message>,
308		peer_store: Arc<dyn PeerStoreProvider>,
309	) -> (ProtocolHandle, ProtocolController) {
310		let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
311		let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
312		let handle = ProtocolHandle { actions_tx, events_tx };
313		peer_store.register_protocol(Arc::new(handle.clone()));
314		let reserved_nodes =
315			config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
316		let controller = ProtocolController {
317			set_id,
318			actions_rx,
319			events_rx,
320			num_in: 0,
321			num_out: 0,
322			max_in: config.in_peers,
323			max_out: config.out_peers,
324			nodes: HashMap::new(),
325			reserved_nodes,
326			reserved_only: config.reserved_only,
327			next_periodic_alloc_slots: Instant::now(),
328			to_notifications,
329			peer_store,
330		};
331		(handle, controller)
332	}
333
334	/// Drive [`ProtocolController`]. This function returns when all instances of
335	/// [`ProtocolHandle`] are dropped.
336	pub async fn run(mut self) {
337		while self.next_action().await {}
338	}
339
340	/// Perform one action. Returns `true` if it should be called again.
341	///
342	/// Intended for tests only. Use `run` for driving [`ProtocolController`].
343	pub async fn next_action(&mut self) -> bool {
344		let either = loop {
345			let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
346
347			// See the module doc for why we use `select_biased!`.
348			futures::select_biased! {
349				event = self.events_rx.next() => match event {
350					Some(event) => break Either::Left(event),
351					None => return false,
352				},
353				action = self.actions_rx.next() => match action {
354					Some(action) => break Either::Right(action),
355					None => return false,
356				},
357				_ = next_alloc_slots => {
358					self.alloc_slots();
359					self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
360				},
361			}
362		};
363
364		match either {
365			Either::Left(event) => self.process_event(event),
366			Either::Right(action) => self.process_action(action),
367		}
368
369		true
370	}
371
372	/// Process connection event.
373	fn process_event(&mut self, event: Event) {
374		match event {
375			Event::IncomingConnection(peer_id, index) => {
376				self.on_incoming_connection(peer_id, index)
377			},
378			Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
379		}
380	}
381
382	/// Process action command.
383	fn process_action(&mut self, action: Action) {
384		match action {
385			Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
386			Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
387			Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
388			Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
389			Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
390			Action::GetReservedPeers(pending_response) => {
391				self.on_get_reserved_peers(pending_response)
392			},
393		}
394	}
395
396	/// Send "accept" message to `Notifications`.
397	fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
398		trace!(
399			target: LOG_TARGET,
400			"Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
401			self.set_id,
402			self.num_in,
403			self.max_in,
404		);
405
406		let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
407	}
408
409	/// Send "reject" message to `Notifications`.
410	fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
411		trace!(
412			target: LOG_TARGET,
413			"Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
414			self.set_id,
415			self.num_in,
416			self.max_in,
417		);
418
419		let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
420	}
421
422	/// Send "connect" message to `Notifications`.
423	fn start_connection(&mut self, peer_id: PeerId) {
424		trace!(
425			target: LOG_TARGET,
426			"Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
427			self.set_id,
428			self.num_out,
429			self.max_out,
430		);
431
432		let _ = self
433			.to_notifications
434			.unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
435	}
436
437	/// Send "drop" message to `Notifications`.
438	fn drop_connection(&mut self, peer_id: PeerId) {
439		trace!(
440			target: LOG_TARGET,
441			"Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
442			self.set_id,
443			self.num_in,
444			self.max_in,
445			self.num_out,
446			self.max_out,
447		);
448
449		let _ = self
450			.to_notifications
451			.unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
452	}
453
454	/// Report peer disconnect event to `PeerStore` for it to update peer's reputation accordingly.
455	/// Should only be called if the remote node disconnected us, not the other way around.
456	fn report_disconnect(&mut self, peer_id: PeerId) {
457		self.peer_store.report_disconnect(peer_id.into());
458	}
459
460	/// Ask `Peerset` if the peer has a reputation value not sufficient for connection with it.
461	fn is_banned(&self, peer_id: &PeerId) -> bool {
462		self.peer_store.is_banned(&peer_id.into())
463	}
464
465	/// Add the peer to the set of reserved peers. [`ProtocolController`] will try to always
466	/// maintain connections with such peers.
467	fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
468		if self.reserved_nodes.contains_key(&peer_id) {
469			debug!(
470				target: LOG_TARGET,
471				"Trying to add an already reserved node {peer_id} as reserved on {:?}.",
472				self.set_id,
473			);
474			return;
475		}
476
477		// Get the peer out of non-reserved peers if it's there.
478		let state = match self.nodes.remove(&peer_id) {
479			Some(direction) => {
480				trace!(
481					target: LOG_TARGET,
482					"Marking previously connected node {} ({:?}) as reserved on {:?}.",
483					peer_id,
484					direction,
485					self.set_id
486				);
487				PeerState::Connected(direction)
488			},
489			None => {
490				trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
491				PeerState::NotConnected
492			},
493		};
494
495		self.reserved_nodes.insert(peer_id, state.clone());
496
497		// Discount occupied slots or connect to the node.
498		match state {
499			PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
500			PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
501			PeerState::NotConnected => self.alloc_slots(),
502		}
503	}
504
505	/// Remove the peer from the set of reserved peers. The peer is either moved to the set of
506	/// regular nodes or disconnected.
507	fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
508		let state = match self.reserved_nodes.remove(&peer_id) {
509			Some(state) => state,
510			None => {
511				warn!(
512					target: LOG_TARGET,
513					"Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
514				);
515				return;
516			},
517		};
518
519		if let PeerState::Connected(direction) = state {
520			// Disconnect if we're at (or over) the regular node limit
521			let disconnect = self.reserved_only ||
522				match direction {
523					Direction::Inbound => self.num_in >= self.max_in,
524					Direction::Outbound => self.num_out >= self.max_out,
525				};
526
527			if disconnect {
528				// Disconnect the node.
529				trace!(
530					target: LOG_TARGET,
531					"Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
532					self.set_id,
533				);
534				self.drop_connection(peer_id);
535			} else {
536				// Count connections as of regular node.
537				trace!(
538					target: LOG_TARGET,
539					"Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
540					direction,
541					self.set_id,
542				);
543
544				match direction {
545					Direction::Inbound => self.num_in += 1,
546					Direction::Outbound => self.num_out += 1,
547				}
548
549				// Put the node into the list of regular nodes.
550				let prev = self.nodes.insert(peer_id, direction);
551				assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
552			}
553		} else {
554			trace!(
555				target: LOG_TARGET,
556				"Removed disconnected reserved node {peer_id} from {:?}.",
557				self.set_id,
558			);
559		}
560	}
561
562	/// Replace the set of reserved peers.
563	fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
564		// Determine the difference between the current group and the new list.
565		let current = self.reserved_nodes.keys().cloned().collect();
566		let to_insert = peer_ids.difference(&current).cloned().collect::<Vec<_>>();
567		let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
568
569		for node in to_insert {
570			self.on_add_reserved_peer(node);
571		}
572
573		for node in to_remove {
574			self.on_remove_reserved_peer(node);
575		}
576	}
577
578	/// Change "reserved only" flag. In "reserved only" mode we connect and accept connections to
579	/// reserved nodes only.
580	fn on_set_reserved_only(&mut self, reserved_only: bool) {
581		trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
582
583		self.reserved_only = reserved_only;
584
585		if !reserved_only {
586			return self.alloc_slots();
587		}
588
589		// Disconnect all non-reserved peers.
590		self.nodes
591			.iter()
592			.map(|(k, v)| (*k, *v))
593			.collect::<Vec<(_, _)>>()
594			.iter()
595			.for_each(|(peer_id, direction)| {
596				// Update counters in the loop for `drop_connection` to report the correct number.
597				match direction {
598					Direction::Inbound => self.num_in -= 1,
599					Direction::Outbound => self.num_out -= 1,
600				}
601				self.drop_connection(*peer_id)
602			});
603		self.nodes.clear();
604	}
605
606	/// Get the list of reserved peers.
607	fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
608		let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
609	}
610
611	/// Disconnect the peer.
612	fn on_disconnect_peer(&mut self, peer_id: PeerId) {
613		// Don't do anything if the node is reserved.
614		if self.reserved_nodes.contains_key(&peer_id) {
615			debug!(
616				target: LOG_TARGET,
617				"Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
618			);
619			return;
620		}
621
622		match self.nodes.remove(&peer_id) {
623			Some(direction) => {
624				trace!(
625					target: LOG_TARGET,
626					"Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
627					self.set_id
628				);
629				match direction {
630					Direction::Inbound => self.num_in -= 1,
631					Direction::Outbound => self.num_out -= 1,
632				}
633				self.drop_connection(peer_id);
634			},
635			None => {
636				debug!(
637					target: LOG_TARGET,
638					"Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
639				);
640			},
641		}
642	}
643
644	/// Indicate that we received an incoming connection. Must be answered either with
645	/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
646	///
647	/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
648	/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
649	/// `dropped`.
650	// Implementation note: because of concurrency issues, `ProtocolController` has an imperfect
651	// view of the peers' states, and may issue commands for a peer after `Notifications` received
652	// an incoming request for that peer. In this case, `Notifications` ignores all the commands
653	// until it receives a response for the incoming request to `ProtocolController`, so we must
654	// ensure we handle this incoming request correctly.
655	fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
656		trace!(
657			target: LOG_TARGET,
658			"Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
659			self.set_id,
660		);
661
662		if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
663			self.reject_connection(peer_id, incoming_index);
664			return;
665		}
666
667		// Check if the node is reserved first.
668		if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
669			match state {
670				PeerState::Connected(ref mut direction) => {
671					// We are accepting an incoming connection, so ensure the direction is inbound.
672					// (See the implementation note above.)
673					*direction = Direction::Inbound;
674					self.accept_connection(peer_id, incoming_index);
675				},
676				PeerState::NotConnected => {
677					if self.peer_store.is_banned(&peer_id.into()) {
678						self.reject_connection(peer_id, incoming_index);
679					} else {
680						*state = PeerState::Connected(Direction::Inbound);
681						self.accept_connection(peer_id, incoming_index);
682					}
683				},
684			}
685			return;
686		}
687
688		// If we're already connected, pretend we are not connected and decide on the node again.
689		// (See the note above.)
690		if let Some(direction) = self.nodes.remove(&peer_id) {
691			trace!(
692				target: LOG_TARGET,
693				"Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
694				peer_id,
695				direction,
696				self.set_id
697			);
698			match direction {
699				Direction::Inbound => self.num_in -= 1,
700				Direction::Outbound => self.num_out -= 1,
701			}
702		}
703
704		if self.num_in >= self.max_in {
705			self.reject_connection(peer_id, incoming_index);
706			return;
707		}
708
709		if self.is_banned(&peer_id) {
710			self.reject_connection(peer_id, incoming_index);
711			return;
712		}
713
714		self.num_in += 1;
715		self.nodes.insert(peer_id, Direction::Inbound);
716		self.accept_connection(peer_id, incoming_index);
717	}
718
719	/// Indicate that a connection with the peer was dropped.
720	fn on_peer_dropped(&mut self, peer_id: PeerId) {
721		self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
722			// We do not assert here, because due to asynchronous nature of communication
723			// between `ProtocolController` and `Notifications` we can receive `Action::Dropped`
724			// for a peer we already disconnected ourself.
725			trace!(
726				target: LOG_TARGET,
727				"Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
728				self.set_id,
729			)
730		});
731	}
732
733	/// Indicate that a connection with the peer was dropped.
734	/// Returns `Err(PeerId)` if the peer wasn't connected or is not known to us.
735	fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
736		if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
737			// The peer found and disconnected.
738			self.report_disconnect(peer_id);
739			Ok(())
740		} else {
741			// The peer was not found in neither regular or reserved lists.
742			Err(peer_id)
743		}
744	}
745
746	/// Try dropping the peer as a reserved peer. Return `Ok(true)` if the peer was found and
747	/// disconnected, `Ok(false)` if it wasn't found, `Err(PeerId)`, if the peer found, but not in
748	/// connected state.
749	fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
750		let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
751
752		if let PeerState::Connected(direction) = state {
753			trace!(
754				target: LOG_TARGET,
755				"Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
756				self.set_id,
757			);
758			*state = PeerState::NotConnected;
759			Ok(true)
760		} else {
761			Err(*peer_id)
762		}
763	}
764
765	/// Try dropping the peer as a regular peer. Return `true` if the peer was found and
766	/// disconnected, `false` if it wasn't found.
767	fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
768		let Some(direction) = self.nodes.remove(peer_id) else { return false };
769
770		trace!(
771			target: LOG_TARGET,
772			"Peer {peer_id} ({direction:?}) dropped from {:?}.",
773			self.set_id,
774		);
775
776		match direction {
777			Direction::Inbound => self.num_in -= 1,
778			Direction::Outbound => self.num_out -= 1,
779		}
780
781		true
782	}
783
784	/// Initiate outgoing connections trying to connect all reserved nodes and fill in all outgoing
785	/// slots.
786	fn alloc_slots(&mut self) {
787		// Try connecting to reserved nodes first, ignoring nodes with outstanding events/actions.
788		self.reserved_nodes
789			.iter_mut()
790			.filter_map(|(peer_id, state)| {
791				(!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
792					*state = PeerState::Connected(Direction::Outbound);
793					peer_id
794				})
795			})
796			.cloned()
797			.collect::<Vec<_>>()
798			.into_iter()
799			.for_each(|peer_id| {
800				self.start_connection(peer_id);
801			});
802
803		// Nothing more to do if we're in reserved-only mode or don't have slots available.
804		if self.reserved_only || self.num_out >= self.max_out {
805			return;
806		}
807
808		// Fill available slots.
809		let available_slots = (self.max_out - self.num_out).saturated_into();
810
811		// Ignore reserved nodes (connected above), already connected nodes, and nodes with
812		// outstanding events/actions.
813		let ignored = self
814			.reserved_nodes
815			.keys()
816			.map(From::from)
817			.collect::<HashSet<sc_network_types::PeerId>>()
818			.union(
819				&self.nodes.keys().map(From::from).collect::<HashSet<sc_network_types::PeerId>>(),
820			)
821			.cloned()
822			.collect();
823
824		let candidates = self
825			.peer_store
826			.outgoing_candidates(available_slots, ignored)
827			.into_iter()
828			.filter_map(|peer_id| {
829				(!self.reserved_nodes.contains_key(&peer_id.into()) &&
830					!self.nodes.contains_key(&peer_id.into()))
831				.then_some(peer_id)
832				.or_else(|| {
833					error!(
834						target: LOG_TARGET,
835						"`PeerStore` returned a node we asked to ignore: {peer_id}.",
836					);
837					debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
838					None
839				})
840			})
841			.collect::<Vec<_>>();
842
843		if candidates.len() > available_slots {
844			error!(
845				target: LOG_TARGET,
846				"`PeerStore` returned more nodes than there are slots available.",
847			);
848			debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
849		}
850
851		candidates.into_iter().take(available_slots).for_each(|peer_id| {
852			self.num_out += 1;
853			self.nodes.insert(peer_id.into(), Direction::Outbound);
854			self.start_connection(peer_id.into());
855		})
856	}
857}
858
859#[cfg(test)]
860mod tests {
861	use super::*;
862	use crate::{
863		peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
864		ReputationChange,
865	};
866	use libp2p::PeerId;
867	use sc_network_common::role::ObservedRole;
868	use sc_utils::mpsc::{tracing_unbounded, TryRecvError};
869	use std::collections::HashSet;
870
871	mockall::mock! {
872		#[derive(Debug)]
873		pub PeerStoreHandle {}
874
875		impl PeerStoreProvider for PeerStoreHandle {
876			fn is_banned(&self, peer_id: &sc_network_types::PeerId) -> bool;
877			fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
878			fn report_disconnect(&self, peer_id: sc_network_types::PeerId);
879			fn set_peer_role(&self, peer_id: &sc_network_types::PeerId, role: ObservedRole);
880			fn report_peer(&self, peer_id: sc_network_types::PeerId, change: ReputationChange);
881			fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32;
882			fn peer_role(&self, peer_id: &sc_network_types::PeerId) -> Option<ObservedRole>;
883			fn outgoing_candidates(&self, count: usize, ignored: HashSet<sc_network_types::PeerId>) -> Vec<sc_network_types::PeerId>;
884			fn add_known_peer(&self, peer_id: sc_network_types::PeerId);
885		}
886	}
887
888	#[test]
889	fn reserved_nodes_are_connected_dropped_and_accepted() {
890		let reserved1 = PeerId::random();
891		let reserved2 = PeerId::random();
892
893		// Add first reserved node via config.
894		let config = ProtoSetConfig {
895			in_peers: 0,
896			out_peers: 0,
897			reserved_nodes: std::iter::once(reserved1).collect(),
898			reserved_only: true,
899		};
900		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
901
902		let mut peer_store = MockPeerStoreHandle::new();
903		peer_store.expect_register_protocol().once().return_const(());
904		peer_store.expect_is_banned().times(4).return_const(false);
905		peer_store.expect_report_disconnect().times(2).return_const(());
906
907		let (_handle, mut controller) =
908			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
909
910		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
911		controller.on_add_reserved_peer(reserved2);
912
913		// Initiate connections (currently, `alloc_slots` is also called internally in
914		// `on_add_reserved_peer` above).
915		controller.alloc_slots();
916
917		let mut messages = Vec::new();
918		while let Some(message) = rx.try_recv().ok() {
919			messages.push(message);
920		}
921		assert_eq!(messages.len(), 2);
922		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
923		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
924
925		// Reserved peers do not occupy slots.
926		assert_eq!(controller.num_out, 0);
927		assert_eq!(controller.num_in, 0);
928
929		// Drop connections to be able to accept reserved nodes.
930		controller.on_peer_dropped(reserved1);
931		controller.on_peer_dropped(reserved2);
932
933		// Incoming connection from `reserved1`.
934		let incoming1 = IncomingIndex(1);
935		controller.on_incoming_connection(reserved1, incoming1);
936		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
937		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
938
939		// Incoming connection from `reserved2`.
940		let incoming2 = IncomingIndex(2);
941		controller.on_incoming_connection(reserved2, incoming2);
942		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
943		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
944
945		// Reserved peers do not occupy slots.
946		assert_eq!(controller.num_out, 0);
947		assert_eq!(controller.num_in, 0);
948	}
949
950	#[test]
951	fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
952		let reserved1 = PeerId::random();
953		let reserved2 = PeerId::random();
954
955		// Add first reserved node via config.
956		let config = ProtoSetConfig {
957			in_peers: 0,
958			out_peers: 0,
959			reserved_nodes: std::iter::once(reserved1).collect(),
960			reserved_only: true,
961		};
962		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
963
964		let mut peer_store = MockPeerStoreHandle::new();
965		peer_store.expect_register_protocol().once().return_const(());
966		peer_store.expect_is_banned().times(6).return_const(true);
967
968		let (_handle, mut controller) =
969			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
970
971		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
972		controller.on_add_reserved_peer(reserved2);
973
974		// Initiate connections.
975		controller.alloc_slots();
976
977		// No slots occupied.
978		assert_eq!(controller.num_out, 0);
979		assert_eq!(controller.num_in, 0);
980
981		// No commands are generated.
982		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
983
984		// Incoming connection from `reserved1`.
985		let incoming1 = IncomingIndex(1);
986		controller.on_incoming_connection(reserved1, incoming1);
987		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
988		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
989
990		// Incoming connection from `reserved2`.
991		let incoming2 = IncomingIndex(2);
992		controller.on_incoming_connection(reserved2, incoming2);
993		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
994		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
995
996		// No slots occupied.
997		assert_eq!(controller.num_out, 0);
998		assert_eq!(controller.num_in, 0);
999	}
1000
1001	#[test]
1002	fn we_try_to_reconnect_to_dropped_reserved_nodes() {
1003		let reserved1 = PeerId::random();
1004		let reserved2 = PeerId::random();
1005
1006		// Add first reserved node via config.
1007		let config = ProtoSetConfig {
1008			in_peers: 0,
1009			out_peers: 0,
1010			reserved_nodes: std::iter::once(reserved1).collect(),
1011			reserved_only: true,
1012		};
1013		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1014
1015		let mut peer_store = MockPeerStoreHandle::new();
1016		peer_store.expect_register_protocol().once().return_const(());
1017		peer_store.expect_is_banned().times(4).return_const(false);
1018		peer_store.expect_report_disconnect().times(2).return_const(());
1019
1020		let (_handle, mut controller) =
1021			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1022
1023		// Add second reserved node at runtime (this calls `alloc_slots` internally).
1024		controller.on_add_reserved_peer(reserved2);
1025
1026		// Initiate connections (actually redundant, see previous comment).
1027		controller.alloc_slots();
1028
1029		let mut messages = Vec::new();
1030		while let Some(message) = rx.try_recv().ok() {
1031			messages.push(message);
1032		}
1033
1034		assert_eq!(messages.len(), 2);
1035		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1036		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1037
1038		// Drop both reserved nodes.
1039		controller.on_peer_dropped(reserved1);
1040		controller.on_peer_dropped(reserved2);
1041
1042		// Initiate connections.
1043		controller.alloc_slots();
1044
1045		let mut messages = Vec::new();
1046		while let Some(message) = rx.try_recv().ok() {
1047			messages.push(message);
1048		}
1049
1050		assert_eq!(messages.len(), 2);
1051		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1052		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1053
1054		// No slots occupied.
1055		assert_eq!(controller.num_out, 0);
1056		assert_eq!(controller.num_in, 0);
1057	}
1058
1059	#[test]
1060	fn nodes_supplied_by_peer_store_are_connected() {
1061		let peer1 = PeerId::random();
1062		let peer2 = PeerId::random();
1063		let candidates = vec![peer1.into(), peer2.into()];
1064
1065		let config = ProtoSetConfig {
1066			in_peers: 0,
1067			// Less slots than candidates.
1068			out_peers: 2,
1069			reserved_nodes: HashSet::new(),
1070			reserved_only: false,
1071		};
1072		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1073
1074		let mut peer_store = MockPeerStoreHandle::new();
1075		peer_store.expect_register_protocol().once().return_const(());
1076		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1077
1078		let (_handle, mut controller) =
1079			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1080
1081		// Initiate connections.
1082		controller.alloc_slots();
1083
1084		let mut messages = Vec::new();
1085		while let Some(message) = rx.try_recv().ok() {
1086			messages.push(message);
1087		}
1088
1089		// Only first two peers are connected (we only have 2 slots).
1090		assert_eq!(messages.len(), 2);
1091		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1092		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1093
1094		// Outgoing slots occupied.
1095		assert_eq!(controller.num_out, 2);
1096		assert_eq!(controller.num_in, 0);
1097
1098		// No more nodes are connected.
1099		controller.alloc_slots();
1100		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1101
1102		// No more slots occupied.
1103		assert_eq!(controller.num_out, 2);
1104		assert_eq!(controller.num_in, 0);
1105	}
1106
1107	#[test]
1108	fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1109		let reserved1 = PeerId::random();
1110		let reserved2 = PeerId::random();
1111		let regular1 = PeerId::random();
1112		let regular2 = PeerId::random();
1113		let outgoing_candidates = vec![regular1.into(), regular2.into()];
1114		let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1115
1116		let config =
1117			ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1118		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1119
1120		let mut peer_store = MockPeerStoreHandle::new();
1121		peer_store.expect_register_protocol().once().return_const(());
1122		peer_store.expect_is_banned().times(2).return_const(false);
1123		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1124
1125		let (_handle, mut controller) =
1126			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1127
1128		// Initiate connections.
1129		controller.alloc_slots();
1130
1131		let mut messages = Vec::new();
1132		while let Some(message) = rx.try_recv().ok() {
1133			messages.push(message);
1134		}
1135		assert_eq!(messages.len(), 4);
1136		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1137		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1138		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1139		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1140		assert_eq!(controller.num_out, 2);
1141		assert_eq!(controller.num_in, 0);
1142	}
1143
1144	#[test]
1145	fn if_slots_are_freed_we_try_to_allocate_them_again() {
1146		let peer1 = PeerId::random();
1147		let peer2 = PeerId::random();
1148		let peer3 = PeerId::random();
1149		let candidates1 = vec![peer1.into(), peer2.into()];
1150		let candidates2 = vec![peer3.into()];
1151
1152		let config = ProtoSetConfig {
1153			in_peers: 0,
1154			// Less slots than candidates.
1155			out_peers: 2,
1156			reserved_nodes: HashSet::new(),
1157			reserved_only: false,
1158		};
1159		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1160
1161		let mut peer_store = MockPeerStoreHandle::new();
1162		peer_store.expect_register_protocol().once().return_const(());
1163		peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1164		peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1165		peer_store.expect_report_disconnect().times(2).return_const(());
1166
1167		let (_handle, mut controller) =
1168			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1169
1170		// Initiate connections.
1171		controller.alloc_slots();
1172
1173		let mut messages = Vec::new();
1174		while let Some(message) = rx.try_recv().ok() {
1175			messages.push(message);
1176		}
1177
1178		// Only first two peers are connected (we only have 2 slots).
1179		assert_eq!(messages.len(), 2);
1180		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1181		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1182
1183		// Outgoing slots occupied.
1184		assert_eq!(controller.num_out, 2);
1185		assert_eq!(controller.num_in, 0);
1186
1187		// No more nodes are connected.
1188		controller.alloc_slots();
1189		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1190
1191		// No more slots occupied.
1192		assert_eq!(controller.num_out, 2);
1193		assert_eq!(controller.num_in, 0);
1194
1195		// Drop peers.
1196		controller.on_peer_dropped(peer1);
1197		controller.on_peer_dropped(peer2);
1198
1199		// Slots are freed.
1200		assert_eq!(controller.num_out, 0);
1201		assert_eq!(controller.num_in, 0);
1202
1203		// Initiate connections.
1204		controller.alloc_slots();
1205
1206		let mut messages = Vec::new();
1207		while let Some(message) = rx.try_recv().ok() {
1208			messages.push(message);
1209		}
1210
1211		// Peers are connected.
1212		assert_eq!(messages.len(), 1);
1213		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1214
1215		// Outgoing slots occupied.
1216		assert_eq!(controller.num_out, 1);
1217		assert_eq!(controller.num_in, 0);
1218	}
1219
1220	#[test]
1221	fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1222		let config = ProtoSetConfig {
1223			in_peers: 0,
1224			// Make sure we have slots available.
1225			out_peers: 2,
1226			reserved_nodes: HashSet::new(),
1227			reserved_only: true,
1228		};
1229		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1230
1231		let mut peer_store = MockPeerStoreHandle::new();
1232		peer_store.expect_register_protocol().once().return_const(());
1233
1234		let (_handle, mut controller) =
1235			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1236
1237		// Initiate connections.
1238		controller.alloc_slots();
1239
1240		// No nodes are connected.
1241		assert_eq!(controller.num_out, 0);
1242		assert_eq!(controller.num_in, 0);
1243		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1244	}
1245
1246	#[test]
1247	fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1248		let config = ProtoSetConfig {
1249			// Make sure we have slots available.
1250			in_peers: 2,
1251			out_peers: 0,
1252			reserved_nodes: HashSet::new(),
1253			reserved_only: true,
1254		};
1255		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1256
1257		let mut peer_store = MockPeerStoreHandle::new();
1258		peer_store.expect_register_protocol().once().return_const(());
1259
1260		let (_handle, mut controller) =
1261			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1262
1263		let peer = PeerId::random();
1264		let incoming_index = IncomingIndex(1);
1265		controller.on_incoming_connection(peer, incoming_index);
1266
1267		let mut messages = Vec::new();
1268		while let Some(message) = rx.try_recv().ok() {
1269			messages.push(message);
1270		}
1271
1272		// Peer is rejected.
1273		assert_eq!(messages.len(), 1);
1274		assert!(messages.contains(&Message::Reject(incoming_index)));
1275		assert_eq!(controller.num_out, 0);
1276		assert_eq!(controller.num_in, 0);
1277	}
1278
1279	#[test]
1280	fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1281		let peer1 = PeerId::random();
1282		let peer2 = PeerId::random();
1283		let candidates = vec![peer1.into(), peer2.into()];
1284
1285		let config = ProtoSetConfig {
1286			in_peers: 0,
1287			// Make sure we have slots available.
1288			out_peers: 10,
1289			reserved_nodes: HashSet::new(),
1290			reserved_only: true,
1291		};
1292		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1293
1294		let mut peer_store = MockPeerStoreHandle::new();
1295		peer_store.expect_register_protocol().once().return_const(());
1296		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1297
1298		let (_handle, mut controller) =
1299			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1300
1301		// Initiate connections.
1302		controller.alloc_slots();
1303
1304		// No nodes are connected.
1305		assert_eq!(controller.num_out, 0);
1306		assert_eq!(controller.num_in, 0);
1307		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1308
1309		// Disable reserved-only mode (this also connects to peers).
1310		controller.on_set_reserved_only(false);
1311
1312		let mut messages = Vec::new();
1313		while let Some(message) = rx.try_recv().ok() {
1314			messages.push(message);
1315		}
1316
1317		assert_eq!(messages.len(), 2);
1318		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1319		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1320		assert_eq!(controller.num_out, 2);
1321		assert_eq!(controller.num_in, 0);
1322	}
1323
1324	#[test]
1325	fn enabling_reserved_only_mode_disconnects_regular_peers() {
1326		let reserved1 = PeerId::random();
1327		let reserved2 = PeerId::random();
1328		let regular1 = PeerId::random();
1329		let regular2 = PeerId::random();
1330		let outgoing_candidates = vec![regular1.into()];
1331
1332		let config = ProtoSetConfig {
1333			in_peers: 10,
1334			out_peers: 10,
1335			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1336			reserved_only: false,
1337		};
1338		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1339
1340		let mut peer_store = MockPeerStoreHandle::new();
1341		peer_store.expect_register_protocol().once().return_const(());
1342		peer_store.expect_is_banned().times(3).return_const(false);
1343		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1344
1345		let (_handle, mut controller) =
1346			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1347		assert_eq!(controller.num_out, 0);
1348		assert_eq!(controller.num_in, 0);
1349
1350		// Connect `regular1` as outbound.
1351		controller.alloc_slots();
1352
1353		let mut messages = Vec::new();
1354		while let Some(message) = rx.try_recv().ok() {
1355			messages.push(message);
1356		}
1357		assert_eq!(messages.len(), 3);
1358		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1359		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1360		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1361		assert_eq!(controller.num_out, 1);
1362		assert_eq!(controller.num_in, 0);
1363
1364		// Connect `regular2` as inbound.
1365		let incoming_index = IncomingIndex(1);
1366		controller.on_incoming_connection(regular2, incoming_index);
1367		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1368		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1369		assert_eq!(controller.num_out, 1);
1370		assert_eq!(controller.num_in, 1);
1371
1372		// Switch to reserved-only mode.
1373		controller.on_set_reserved_only(true);
1374
1375		let mut messages = Vec::new();
1376		while let Some(message) = rx.try_recv().ok() {
1377			messages.push(message);
1378		}
1379		assert_eq!(messages.len(), 2);
1380		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1381		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1382		assert_eq!(controller.nodes.len(), 0);
1383		assert_eq!(controller.num_out, 0);
1384		assert_eq!(controller.num_in, 0);
1385	}
1386
1387	#[test]
1388	fn removed_disconnected_reserved_node_is_forgotten() {
1389		let reserved1 = PeerId::random();
1390		let reserved2 = PeerId::random();
1391
1392		let config = ProtoSetConfig {
1393			in_peers: 10,
1394			out_peers: 10,
1395			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1396			reserved_only: false,
1397		};
1398		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1399
1400		let mut peer_store = MockPeerStoreHandle::new();
1401		peer_store.expect_register_protocol().once().return_const(());
1402
1403		let (_handle, mut controller) =
1404			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1405		assert_eq!(controller.reserved_nodes.len(), 2);
1406		assert_eq!(controller.nodes.len(), 0);
1407		assert_eq!(controller.num_out, 0);
1408		assert_eq!(controller.num_in, 0);
1409
1410		controller.on_remove_reserved_peer(reserved1);
1411		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1412		assert_eq!(controller.reserved_nodes.len(), 1);
1413		assert!(!controller.reserved_nodes.contains_key(&reserved1));
1414		assert_eq!(controller.nodes.len(), 0);
1415		assert_eq!(controller.num_out, 0);
1416		assert_eq!(controller.num_in, 0);
1417	}
1418
1419	#[test]
1420	fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1421		let reserved1 = PeerId::random();
1422		let reserved2 = PeerId::random();
1423
1424		let config = ProtoSetConfig {
1425			in_peers: 10,
1426			out_peers: 10,
1427			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1428			reserved_only: true,
1429		};
1430		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1431
1432		let mut peer_store = MockPeerStoreHandle::new();
1433		peer_store.expect_register_protocol().once().return_const(());
1434		peer_store.expect_is_banned().times(2).return_const(false);
1435
1436		let (_handle, mut controller) =
1437			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1438
1439		// Initiate connections.
1440		controller.alloc_slots();
1441		let mut messages = Vec::new();
1442		while let Some(message) = rx.try_recv().ok() {
1443			messages.push(message);
1444		}
1445		assert_eq!(messages.len(), 2);
1446		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1447		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1448		assert_eq!(controller.reserved_nodes.len(), 2);
1449		assert!(controller.reserved_nodes.contains_key(&reserved1));
1450		assert!(controller.reserved_nodes.contains_key(&reserved2));
1451		assert!(controller.nodes.is_empty());
1452
1453		// Remove reserved node
1454		controller.on_remove_reserved_peer(reserved1);
1455		assert_eq!(
1456			rx.try_recv().unwrap(),
1457			Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1458		);
1459		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1460		assert_eq!(controller.reserved_nodes.len(), 1);
1461		assert!(controller.reserved_nodes.contains_key(&reserved2));
1462		assert!(controller.nodes.is_empty());
1463	}
1464
1465	#[test]
1466	fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1467		let peer1 = PeerId::random();
1468		let peer2 = PeerId::random();
1469
1470		let config = ProtoSetConfig {
1471			in_peers: 10,
1472			out_peers: 10,
1473			reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1474			reserved_only: false,
1475		};
1476		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1477
1478		let mut peer_store = MockPeerStoreHandle::new();
1479		peer_store.expect_register_protocol().once().return_const(());
1480		peer_store.expect_is_banned().times(2).return_const(false);
1481		peer_store
1482			.expect_outgoing_candidates()
1483			.once()
1484			.return_const(Vec::<sc_network_types::PeerId>::new());
1485
1486		let (_handle, mut controller) =
1487			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1488
1489		// Connect `peer1` as inbound, `peer2` as outbound.
1490		controller.on_incoming_connection(peer1, IncomingIndex(1));
1491		controller.alloc_slots();
1492		let mut messages = Vec::new();
1493		while let Some(message) = rx.try_recv().ok() {
1494			messages.push(message);
1495		}
1496		assert_eq!(messages.len(), 2);
1497		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1498		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1499		assert_eq!(controller.num_out, 0);
1500		assert_eq!(controller.num_in, 0);
1501
1502		// Remove reserved nodes (and make them regular)
1503		controller.on_remove_reserved_peer(peer1);
1504		controller.on_remove_reserved_peer(peer2);
1505		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1506		assert_eq!(controller.nodes.len(), 2);
1507		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1508		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1509		assert_eq!(controller.num_out, 1);
1510		assert_eq!(controller.num_in, 1);
1511	}
1512
1513	#[test]
1514	fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1515		let peer1 = PeerId::random();
1516		let peer2 = PeerId::random();
1517		let outgoing_candidates = vec![peer1.into()];
1518
1519		let config = ProtoSetConfig {
1520			in_peers: 10,
1521			out_peers: 10,
1522			reserved_nodes: HashSet::new(),
1523			reserved_only: false,
1524		};
1525		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1526
1527		let mut peer_store = MockPeerStoreHandle::new();
1528		peer_store.expect_register_protocol().once().return_const(());
1529		peer_store.expect_is_banned().once().return_const(false);
1530		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1531
1532		let (_handle, mut controller) =
1533			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1534
1535		// Connect `peer1` as outbound & `peer2` as inbound.
1536		controller.alloc_slots();
1537		controller.on_incoming_connection(peer2, IncomingIndex(1));
1538		let mut messages = Vec::new();
1539		while let Some(message) = rx.try_recv().ok() {
1540			messages.push(message);
1541		}
1542		assert_eq!(messages.len(), 2);
1543		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1544		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1545		assert_eq!(controller.num_in, 1);
1546		assert_eq!(controller.num_out, 1);
1547
1548		controller.on_add_reserved_peer(peer1);
1549		controller.on_add_reserved_peer(peer2);
1550		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1551		assert_eq!(controller.num_in, 0);
1552		assert_eq!(controller.num_out, 0);
1553	}
1554
1555	#[test]
1556	fn disconnecting_regular_peers_work() {
1557		let peer1 = PeerId::random();
1558		let peer2 = PeerId::random();
1559		let outgoing_candidates = vec![peer1.into()];
1560
1561		let config = ProtoSetConfig {
1562			in_peers: 10,
1563			out_peers: 10,
1564			reserved_nodes: HashSet::new(),
1565			reserved_only: false,
1566		};
1567		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1568
1569		let mut peer_store = MockPeerStoreHandle::new();
1570		peer_store.expect_register_protocol().once().return_const(());
1571		peer_store.expect_is_banned().once().return_const(false);
1572		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1573
1574		let (_handle, mut controller) =
1575			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1576
1577		// Connect `peer1` as outbound & `peer2` as inbound.
1578		controller.alloc_slots();
1579		controller.on_incoming_connection(peer2, IncomingIndex(1));
1580		let mut messages = Vec::new();
1581		while let Some(message) = rx.try_recv().ok() {
1582			messages.push(message);
1583		}
1584		assert_eq!(messages.len(), 2);
1585		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1586		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1587		assert_eq!(controller.nodes.len(), 2);
1588		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1589		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1590		assert_eq!(controller.num_in, 1);
1591		assert_eq!(controller.num_out, 1);
1592
1593		controller.on_disconnect_peer(peer1);
1594		assert_eq!(
1595			rx.try_recv().unwrap(),
1596			Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1597		);
1598		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1599		assert_eq!(controller.nodes.len(), 1);
1600		assert!(!controller.nodes.contains_key(&peer1));
1601		assert_eq!(controller.num_in, 1);
1602		assert_eq!(controller.num_out, 0);
1603
1604		controller.on_disconnect_peer(peer2);
1605		assert_eq!(
1606			rx.try_recv().unwrap(),
1607			Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1608		);
1609		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1610		assert_eq!(controller.nodes.len(), 0);
1611		assert_eq!(controller.num_in, 0);
1612		assert_eq!(controller.num_out, 0);
1613	}
1614
1615	#[test]
1616	fn disconnecting_reserved_peers_is_a_noop() {
1617		let reserved1 = PeerId::random();
1618		let reserved2 = PeerId::random();
1619
1620		let config = ProtoSetConfig {
1621			in_peers: 10,
1622			out_peers: 10,
1623			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1624			reserved_only: false,
1625		};
1626		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1627
1628		let mut peer_store = MockPeerStoreHandle::new();
1629		peer_store.expect_register_protocol().once().return_const(());
1630		peer_store.expect_is_banned().times(2).return_const(false);
1631		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1632
1633		let (_handle, mut controller) =
1634			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1635
1636		// Connect `reserved1` as inbound & `reserved2` as outbound.
1637		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1638		controller.alloc_slots();
1639		let mut messages = Vec::new();
1640		while let Some(message) = rx.try_recv().ok() {
1641			messages.push(message);
1642		}
1643		assert_eq!(messages.len(), 2);
1644		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1645		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1646		assert!(matches!(
1647			controller.reserved_nodes.get(&reserved1),
1648			Some(PeerState::Connected(Direction::Inbound))
1649		));
1650		assert!(matches!(
1651			controller.reserved_nodes.get(&reserved2),
1652			Some(PeerState::Connected(Direction::Outbound))
1653		));
1654
1655		controller.on_disconnect_peer(reserved1);
1656		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1657		assert!(matches!(
1658			controller.reserved_nodes.get(&reserved1),
1659			Some(PeerState::Connected(Direction::Inbound))
1660		));
1661
1662		controller.on_disconnect_peer(reserved2);
1663		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1664		assert!(matches!(
1665			controller.reserved_nodes.get(&reserved2),
1666			Some(PeerState::Connected(Direction::Outbound))
1667		));
1668	}
1669
1670	#[test]
1671	fn dropping_regular_peers_work() {
1672		let peer1 = PeerId::random();
1673		let peer2 = PeerId::random();
1674		let outgoing_candidates = vec![peer1.into()];
1675
1676		let config = ProtoSetConfig {
1677			in_peers: 10,
1678			out_peers: 10,
1679			reserved_nodes: HashSet::new(),
1680			reserved_only: false,
1681		};
1682		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1683
1684		let mut peer_store = MockPeerStoreHandle::new();
1685		peer_store.expect_register_protocol().once().return_const(());
1686		peer_store.expect_is_banned().once().return_const(false);
1687		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1688		peer_store.expect_report_disconnect().times(2).return_const(());
1689
1690		let (_handle, mut controller) =
1691			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1692
1693		// Connect `peer1` as outbound & `peer2` as inbound.
1694		controller.alloc_slots();
1695		controller.on_incoming_connection(peer2, IncomingIndex(1));
1696		let mut messages = Vec::new();
1697		while let Some(message) = rx.try_recv().ok() {
1698			messages.push(message);
1699		}
1700		assert_eq!(messages.len(), 2);
1701		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1702		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1703		assert_eq!(controller.nodes.len(), 2);
1704		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1705		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1706		assert_eq!(controller.num_in, 1);
1707		assert_eq!(controller.num_out, 1);
1708
1709		controller.on_peer_dropped(peer1);
1710		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1711		assert_eq!(controller.nodes.len(), 1);
1712		assert!(!controller.nodes.contains_key(&peer1));
1713		assert_eq!(controller.num_in, 1);
1714		assert_eq!(controller.num_out, 0);
1715
1716		controller.on_peer_dropped(peer2);
1717		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1718		assert_eq!(controller.nodes.len(), 0);
1719		assert_eq!(controller.num_in, 0);
1720		assert_eq!(controller.num_out, 0);
1721	}
1722
1723	#[test]
1724	fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1725		let reserved1 = PeerId::random();
1726		let reserved2 = PeerId::random();
1727
1728		let config = ProtoSetConfig {
1729			in_peers: 10,
1730			out_peers: 10,
1731			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1732			reserved_only: false,
1733		};
1734		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1735
1736		let mut peer_store = MockPeerStoreHandle::new();
1737		peer_store.expect_register_protocol().once().return_const(());
1738		peer_store.expect_is_banned().times(2).return_const(false);
1739		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1740
1741		let (_handle, mut controller) =
1742			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1743
1744		// Connect `reserved1` as inbound & `reserved2` as outbound.
1745		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1746		controller.alloc_slots();
1747		let mut messages = Vec::new();
1748		while let Some(message) = rx.try_recv().ok() {
1749			messages.push(message);
1750		}
1751		assert_eq!(messages.len(), 2);
1752		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1753		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1754		assert!(matches!(
1755			controller.reserved_nodes.get(&reserved1),
1756			Some(PeerState::Connected(Direction::Inbound))
1757		));
1758		assert!(matches!(
1759			controller.reserved_nodes.get(&reserved2),
1760			Some(PeerState::Connected(Direction::Outbound))
1761		));
1762
1763		// Incoming request for `reserved1`.
1764		controller.on_incoming_connection(reserved1, IncomingIndex(2));
1765		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1766		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1767		assert!(matches!(
1768			controller.reserved_nodes.get(&reserved1),
1769			Some(PeerState::Connected(Direction::Inbound))
1770		));
1771
1772		// Incoming request for `reserved2`.
1773		controller.on_incoming_connection(reserved2, IncomingIndex(3));
1774		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1775		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1776		assert!(matches!(
1777			controller.reserved_nodes.get(&reserved2),
1778			Some(PeerState::Connected(Direction::Inbound))
1779		));
1780	}
1781
1782	#[test]
1783	fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1784		let regular1 = PeerId::random();
1785		let regular2 = PeerId::random();
1786		let outgoing_candidates = vec![regular1.into()];
1787
1788		let config = ProtoSetConfig {
1789			in_peers: 10,
1790			out_peers: 10,
1791			reserved_nodes: HashSet::new(),
1792			reserved_only: false,
1793		};
1794		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1795
1796		let mut peer_store = MockPeerStoreHandle::new();
1797		peer_store.expect_register_protocol().once().return_const(());
1798		peer_store.expect_is_banned().times(3).return_const(false);
1799		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1800
1801		let (_handle, mut controller) =
1802			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1803		assert_eq!(controller.num_out, 0);
1804		assert_eq!(controller.num_in, 0);
1805
1806		// Connect `regular1` as outbound.
1807		controller.alloc_slots();
1808		assert_eq!(
1809			rx.try_recv().ok().unwrap(),
1810			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1811		);
1812		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1813		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1814
1815		// Connect `regular2` as inbound.
1816		controller.on_incoming_connection(regular2, IncomingIndex(0));
1817		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1818		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1819		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1820
1821		// Incoming request for `regular1`.
1822		controller.on_incoming_connection(regular1, IncomingIndex(1));
1823		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1824		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1825		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Inbound,));
1826
1827		// Incoming request for `regular2`.
1828		controller.on_incoming_connection(regular2, IncomingIndex(2));
1829		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1830		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1831		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1832	}
1833
1834	#[test]
1835	fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1836		let regular1 = PeerId::random();
1837		let regular2 = PeerId::random();
1838		let outgoing_candidates = vec![regular1.into()];
1839
1840		let config = ProtoSetConfig {
1841			in_peers: 10,
1842			out_peers: 10,
1843			reserved_nodes: HashSet::new(),
1844			reserved_only: false,
1845		};
1846		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1847
1848		let mut peer_store = MockPeerStoreHandle::new();
1849		peer_store.expect_register_protocol().once().return_const(());
1850		peer_store.expect_is_banned().once().return_const(false);
1851		peer_store.expect_is_banned().times(2).return_const(true);
1852		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1853
1854		let (_handle, mut controller) =
1855			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1856		assert_eq!(controller.num_out, 0);
1857		assert_eq!(controller.num_in, 0);
1858
1859		// Connect `regular1` as outbound.
1860		controller.alloc_slots();
1861		assert_eq!(
1862			rx.try_recv().ok().unwrap(),
1863			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1864		);
1865		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1866		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1867
1868		// Connect `regular2` as inbound.
1869		controller.on_incoming_connection(regular2, IncomingIndex(0));
1870		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1871		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1872		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1873
1874		// Incoming request for `regular1`.
1875		controller.on_incoming_connection(regular1, IncomingIndex(1));
1876		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1877		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1878		assert!(!controller.nodes.contains_key(&regular1));
1879
1880		// Incoming request for `regular2`.
1881		controller.on_incoming_connection(regular2, IncomingIndex(2));
1882		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1883		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1884		assert!(!controller.nodes.contains_key(&regular2));
1885	}
1886
1887	#[test]
1888	fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1889		let regular1 = PeerId::random();
1890		let regular2 = PeerId::random();
1891		let outgoing_candidates = vec![regular1.into()];
1892
1893		let config = ProtoSetConfig {
1894			in_peers: 1,
1895			out_peers: 1,
1896			reserved_nodes: HashSet::new(),
1897			reserved_only: false,
1898		};
1899		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1900
1901		let mut peer_store = MockPeerStoreHandle::new();
1902		peer_store.expect_register_protocol().once().return_const(());
1903		peer_store.expect_is_banned().once().return_const(false);
1904		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1905
1906		let (_handle, mut controller) =
1907			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1908		assert_eq!(controller.num_out, 0);
1909		assert_eq!(controller.num_in, 0);
1910
1911		// Connect `regular1` as outbound.
1912		controller.alloc_slots();
1913		assert_eq!(
1914			rx.try_recv().ok().unwrap(),
1915			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1916		);
1917		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1918		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1919
1920		// Connect `regular2` as inbound.
1921		controller.on_incoming_connection(regular2, IncomingIndex(0));
1922		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1923		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1924		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1925
1926		controller.max_in = 0;
1927
1928		// Incoming request for `regular1`.
1929		controller.on_incoming_connection(regular1, IncomingIndex(1));
1930		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1931		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1932		assert!(!controller.nodes.contains_key(&regular1));
1933
1934		// Incoming request for `regular2`.
1935		controller.on_incoming_connection(regular2, IncomingIndex(2));
1936		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1937		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1938		assert!(!controller.nodes.contains_key(&regular2));
1939	}
1940
1941	#[test]
1942	fn incoming_peers_that_exceed_slots_are_rejected() {
1943		let peer1 = PeerId::random();
1944		let peer2 = PeerId::random();
1945
1946		let config = ProtoSetConfig {
1947			in_peers: 1,
1948			out_peers: 10,
1949			reserved_nodes: HashSet::new(),
1950			reserved_only: false,
1951		};
1952		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1953
1954		let mut peer_store = MockPeerStoreHandle::new();
1955		peer_store.expect_register_protocol().once().return_const(());
1956		peer_store.expect_is_banned().once().return_const(false);
1957
1958		let (_handle, mut controller) =
1959			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1960
1961		// Connect `peer1` as inbound.
1962		controller.on_incoming_connection(peer1, IncomingIndex(1));
1963		assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1964		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1965
1966		// Incoming requests for `peer2`.
1967		controller.on_incoming_connection(peer2, IncomingIndex(2));
1968		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1969		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1970	}
1971
1972	#[test]
1973	fn banned_regular_incoming_node_is_rejected() {
1974		let peer1 = PeerId::random();
1975
1976		let config = ProtoSetConfig {
1977			in_peers: 10,
1978			out_peers: 10,
1979			reserved_nodes: HashSet::new(),
1980			reserved_only: false,
1981		};
1982		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1983
1984		let mut peer_store = MockPeerStoreHandle::new();
1985		peer_store.expect_register_protocol().once().return_const(());
1986		peer_store.expect_is_banned().once().return_const(true);
1987
1988		let (_handle, mut controller) =
1989			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1990
1991		// Incoming request.
1992		controller.on_incoming_connection(peer1, IncomingIndex(1));
1993		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1994		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1995	}
1996
1997	#[test]
1998	fn banned_reserved_incoming_node_is_rejected() {
1999		let reserved1 = PeerId::random();
2000
2001		let config = ProtoSetConfig {
2002			in_peers: 10,
2003			out_peers: 10,
2004			reserved_nodes: std::iter::once(reserved1).collect(),
2005			reserved_only: false,
2006		};
2007		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2008
2009		let mut peer_store = MockPeerStoreHandle::new();
2010		peer_store.expect_register_protocol().once().return_const(());
2011		peer_store.expect_is_banned().once().return_const(true);
2012
2013		let (_handle, mut controller) =
2014			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2015		assert!(controller.reserved_nodes.contains_key(&reserved1));
2016
2017		// Incoming request.
2018		controller.on_incoming_connection(reserved1, IncomingIndex(1));
2019		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2020		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2021	}
2022
2023	#[test]
2024	fn we_dont_connect_to_banned_reserved_node() {
2025		let reserved1 = PeerId::random();
2026
2027		let config = ProtoSetConfig {
2028			in_peers: 10,
2029			out_peers: 10,
2030			reserved_nodes: std::iter::once(reserved1).collect(),
2031			reserved_only: false,
2032		};
2033		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2034
2035		let mut peer_store = MockPeerStoreHandle::new();
2036		peer_store.expect_register_protocol().once().return_const(());
2037		peer_store.expect_is_banned().once().return_const(true);
2038		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2039
2040		let (_handle, mut controller) =
2041			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2042		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2043
2044		// Initiate connections
2045		controller.alloc_slots();
2046		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2047		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2048	}
2049}