Skip to main content

soil_network/
protocol_controller.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Protocol Controller. Generic implementation of peer management for protocols.
8//! Responsible for accepting/rejecting incoming connections and initiating outgoing connections,
9//! respecting the inbound and outbound peer slot counts. Communicates with `PeerStore` to get and
10//! update peer reputation values and sends commands to `Notifications`.
11//!
12//! Due to asynchronous nature of communication between `ProtocolController` and `Notifications`,
13//! `ProtocolController` has an imperfect view of the states of the peers. To reduce this
14//! desynchronization, the following measures are taken:
15//!
16//! 1. Network peer events from `Notifications` are prioritized over actions from external API and
17//!    internal actions by `ProtocolController` (like slot allocation).
18//! 2. `Notifications` ignores all commands from `ProtocolController` after sending "incoming"
19//!    request until receiving the answer to this "incoming" request.
20//! 3. After sending a "connect" message, `ProtocolController` switches the state of the peer from
21//!    `Outbound` to `Inbound` if it receives an "incoming" request from `Notifications` for this
22//!    peer.
23//!
24//! These measures do not eliminate confusing commands from `ProtocolController` completely,
25//! so `Notifications` must correctly handle seemingly inconsistent commands, like a "connect"
26//! command for the peer it thinks is already connected, and a "drop" command for a peer that
27//! was previously dropped.
28//!
29//! Even though this does not guarantee that `ProtocolController` and `Notifications` have the same
30//! view of the peers' states at any given moment, the eventual consistency is maintained.
31
32use crate::peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT};
33
34use futures::{channel::oneshot, future::Either, FutureExt, StreamExt};
35use libp2p::PeerId;
36use log::{debug, error, trace, warn};
37use soil_client::utils::mpsc::{
38	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
39};
40use std::{
41	collections::{HashMap, HashSet},
42	sync::Arc,
43	time::{Duration, Instant},
44};
45use subsoil::arithmetic::traits::SaturatedConversion;
46use wasm_timer::Delay;
47
48/// Log target for this file.
49pub const LOG_TARGET: &str = "peerset";
50
51/// `Notifications` protocol index. For historical reasons it's called `SetId`, because it
52/// used to refer to a set of peers in a peerset for this protocol.
53///
54/// Can be constructed using the `From<usize>` trait implementation based on the index of the
55/// protocol in `Notifications`.
56#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
57pub struct SetId(usize);
58
59impl SetId {
60	/// Const conversion function for initialization of hardcoded peerset indices.
61	pub const fn from(id: usize) -> Self {
62		Self(id)
63	}
64}
65
66impl From<usize> for SetId {
67	fn from(id: usize) -> Self {
68		Self(id)
69	}
70}
71
72impl From<SetId> for usize {
73	fn from(id: SetId) -> Self {
74		id.0
75	}
76}
77
78/// Configuration for a set of nodes for a specific protocol.
79#[derive(Debug)]
80pub struct ProtoSetConfig {
81	/// Maximum number of incoming links to peers.
82	pub in_peers: u32,
83
84	/// Maximum number of outgoing links to peers.
85	pub out_peers: u32,
86
87	/// Lists of nodes we should always be connected to.
88	///
89	/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
90	/// >			otherwise it will not be able to connect to them.
91	pub reserved_nodes: HashSet<PeerId>,
92
93	/// If true, we only accept nodes in [`ProtoSetConfig::reserved_nodes`].
94	pub reserved_only: bool,
95}
96
97/// Message that is sent by [`ProtocolController`] to `Notifications`.
98#[derive(Debug, PartialEq)]
99pub enum Message {
100	/// Request to open a connection to the given peer. From the point of view of the
101	/// `ProtocolController`, we are immediately connected.
102	Connect {
103		/// Set id to connect on.
104		set_id: SetId,
105		/// Peer to connect to.
106		peer_id: PeerId,
107	},
108
109	/// Drop the connection to the given peer, or cancel the connection attempt after a `Connect`.
110	Drop {
111		/// Set id to disconnect on.
112		set_id: SetId,
113		/// Peer to disconnect from.
114		peer_id: PeerId,
115	},
116
117	/// Equivalent to `Connect` for the peer corresponding to this incoming index.
118	Accept(IncomingIndex),
119
120	/// Equivalent to `Drop` for the peer corresponding to this incoming index.
121	Reject(IncomingIndex),
122}
123
124/// Opaque identifier for an incoming connection. Allocated by the network.
125#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
126pub struct IncomingIndex(pub u64);
127
128impl From<u64> for IncomingIndex {
129	fn from(val: u64) -> Self {
130		Self(val)
131	}
132}
133
134/// External API actions.
135#[derive(Debug)]
136enum Action {
137	/// Add a reserved peer or mark already connected peer as reserved.
138	AddReservedPeer(PeerId),
139	/// Remove a reserved peer.
140	RemoveReservedPeer(PeerId),
141	/// Update reserved peers to match the provided set.
142	SetReservedPeers(HashSet<PeerId>),
143	/// Set/unset reserved-only mode.
144	SetReservedOnly(bool),
145	/// Disconnect a peer.
146	DisconnectPeer(PeerId),
147	/// Get the list of reserved peers.
148	GetReservedPeers(oneshot::Sender<Vec<PeerId>>),
149}
150
151/// Network events from `Notifications`.
152#[derive(Debug)]
153enum Event {
154	/// Incoming connection from the peer.
155	IncomingConnection(PeerId, IncomingIndex),
156	/// Connection with the peer dropped.
157	Dropped(PeerId),
158}
159
160/// Shared handle to [`ProtocolController`]. Distributed around the code outside of the
161/// protocol implementation.
162#[derive(Debug, Clone)]
163pub struct ProtocolHandle {
164	/// Actions from outer API.
165	actions_tx: TracingUnboundedSender<Action>,
166	/// Connection events from `Notifications`. We prioritize them over actions.
167	events_tx: TracingUnboundedSender<Event>,
168}
169
170impl ProtocolHandle {
171	/// Adds a new reserved peer. [`ProtocolController`] will make an effort
172	/// to always remain connected to this peer.
173	///
174	/// Has no effect if the node was already a reserved peer.
175	///
176	/// > **Note**: Keep in mind that the networking has to know an address for this node,
177	/// > otherwise it will not be able to connect to it.
178	pub fn add_reserved_peer(&self, peer_id: PeerId) {
179		let _ = self.actions_tx.unbounded_send(Action::AddReservedPeer(peer_id));
180	}
181
182	/// Demotes reserved peer to non-reserved. Does not disconnect the peer.
183	///
184	/// Has no effect if the node was not a reserved peer.
185	pub fn remove_reserved_peer(&self, peer_id: PeerId) {
186		let _ = self.actions_tx.unbounded_send(Action::RemoveReservedPeer(peer_id));
187	}
188
189	/// Set reserved peers to the new set.
190	pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
191		let _ = self.actions_tx.unbounded_send(Action::SetReservedPeers(peer_ids));
192	}
193
194	/// Sets whether or not [`ProtocolController`] only has connections with nodes marked
195	/// as reserved for the given set.
196	pub fn set_reserved_only(&self, reserved: bool) {
197		let _ = self.actions_tx.unbounded_send(Action::SetReservedOnly(reserved));
198	}
199
200	/// Disconnect peer. You should remove the `PeerId` from the `PeerStore` first
201	/// to not connect to the peer again during the next slot allocation.
202	pub fn disconnect_peer(&self, peer_id: PeerId) {
203		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id));
204	}
205
206	/// Get the list of reserved peers.
207	pub fn reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
208		let _ = self.actions_tx.unbounded_send(Action::GetReservedPeers(pending_response));
209	}
210
211	/// Notify about incoming connection. [`ProtocolController`] will either accept or reject it.
212	pub fn incoming_connection(&self, peer_id: PeerId, incoming_index: IncomingIndex) {
213		let _ = self
214			.events_tx
215			.unbounded_send(Event::IncomingConnection(peer_id, incoming_index));
216	}
217
218	/// Notify that connection was dropped (either refused or disconnected).
219	pub fn dropped(&self, peer_id: PeerId) {
220		let _ = self.events_tx.unbounded_send(Event::Dropped(peer_id));
221	}
222}
223
224impl ProtocolHandleT for ProtocolHandle {
225	fn disconnect_peer(&self, peer_id: crate::types::PeerId) {
226		let _ = self.actions_tx.unbounded_send(Action::DisconnectPeer(peer_id.into()));
227	}
228}
229
230/// Direction of a connection
231#[derive(Clone, Copy, Debug)]
232enum Direction {
233	Inbound,
234	Outbound,
235}
236
237/// Status of a connection with a peer.
238#[derive(Clone, Debug)]
239enum PeerState {
240	/// We are connected to the peer.
241	Connected(Direction),
242	/// We are not connected.
243	NotConnected,
244}
245
246impl PeerState {
247	/// Returns true if we are connected with the node.
248	fn is_connected(&self) -> bool {
249		matches!(self, PeerState::Connected(_))
250	}
251}
252
253impl Default for PeerState {
254	fn default() -> PeerState {
255		PeerState::NotConnected
256	}
257}
258
259/// Worker side of [`ProtocolHandle`] responsible for all the logic.
260#[derive(Debug)]
261pub struct ProtocolController {
262	/// Set id to use when sending connect/drop requests to `Notifications`.
263	// Will likely be replaced by `ProtocolName` in the future.
264	set_id: SetId,
265	/// Receiver for outer API messages from [`ProtocolHandle`].
266	actions_rx: TracingUnboundedReceiver<Action>,
267	/// Receiver for connection events from `Notifications` sent via [`ProtocolHandle`].
268	events_rx: TracingUnboundedReceiver<Event>,
269	/// Number of occupied slots for incoming connections (not counting reserved nodes).
270	num_in: u32,
271	/// Number of occupied slots for outgoing connections (not counting reserved nodes).
272	num_out: u32,
273	/// Maximum number of slots for incoming connections (not counting reserved nodes).
274	max_in: u32,
275	/// Maximum number of slots for outgoing connections (not counting reserved nodes).
276	max_out: u32,
277	/// Connected regular nodes.
278	nodes: HashMap<PeerId, Direction>,
279	/// Reserved nodes. Should be always connected and do not occupy peer slots.
280	reserved_nodes: HashMap<PeerId, PeerState>,
281	/// Connect only to reserved nodes.
282	reserved_only: bool,
283	/// Next time to allocate slots. This is done once per second.
284	next_periodic_alloc_slots: Instant,
285	/// Outgoing channel for messages to `Notifications`.
286	to_notifications: TracingUnboundedSender<Message>,
287	/// `PeerStore` handle for checking peer reputation values and getting connection candidates
288	/// with highest reputation.
289	peer_store: Arc<dyn PeerStoreProvider>,
290}
291
292impl ProtocolController {
293	/// Construct new [`ProtocolController`].
294	pub fn new(
295		set_id: SetId,
296		config: ProtoSetConfig,
297		to_notifications: TracingUnboundedSender<Message>,
298		peer_store: Arc<dyn PeerStoreProvider>,
299	) -> (ProtocolHandle, ProtocolController) {
300		let (actions_tx, actions_rx) = tracing_unbounded("mpsc_api_protocol", 10_000);
301		let (events_tx, events_rx) = tracing_unbounded("mpsc_notifications_protocol", 10_000);
302		let handle = ProtocolHandle { actions_tx, events_tx };
303		peer_store.register_protocol(Arc::new(handle.clone()));
304		let reserved_nodes =
305			config.reserved_nodes.iter().map(|p| (*p, PeerState::NotConnected)).collect();
306		let controller = ProtocolController {
307			set_id,
308			actions_rx,
309			events_rx,
310			num_in: 0,
311			num_out: 0,
312			max_in: config.in_peers,
313			max_out: config.out_peers,
314			nodes: HashMap::new(),
315			reserved_nodes,
316			reserved_only: config.reserved_only,
317			next_periodic_alloc_slots: Instant::now(),
318			to_notifications,
319			peer_store,
320		};
321		(handle, controller)
322	}
323
324	/// Drive [`ProtocolController`]. This function returns when all instances of
325	/// [`ProtocolHandle`] are dropped.
326	pub async fn run(mut self) {
327		while self.next_action().await {}
328	}
329
330	/// Perform one action. Returns `true` if it should be called again.
331	///
332	/// Intended for tests only. Use `run` for driving [`ProtocolController`].
333	pub async fn next_action(&mut self) -> bool {
334		let either = loop {
335			let mut next_alloc_slots = Delay::new_at(self.next_periodic_alloc_slots).fuse();
336
337			// See the module doc for why we use `select_biased!`.
338			futures::select_biased! {
339				event = self.events_rx.next() => match event {
340					Some(event) => break Either::Left(event),
341					None => return false,
342				},
343				action = self.actions_rx.next() => match action {
344					Some(action) => break Either::Right(action),
345					None => return false,
346				},
347				_ = next_alloc_slots => {
348					self.alloc_slots();
349					self.next_periodic_alloc_slots = Instant::now() + Duration::new(1, 0);
350				},
351			}
352		};
353
354		match either {
355			Either::Left(event) => self.process_event(event),
356			Either::Right(action) => self.process_action(action),
357		}
358
359		true
360	}
361
362	/// Process connection event.
363	fn process_event(&mut self, event: Event) {
364		match event {
365			Event::IncomingConnection(peer_id, index) => {
366				self.on_incoming_connection(peer_id, index)
367			},
368			Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
369		}
370	}
371
372	/// Process action command.
373	fn process_action(&mut self, action: Action) {
374		match action {
375			Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
376			Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
377			Action::SetReservedPeers(peer_ids) => self.on_set_reserved_peers(peer_ids),
378			Action::SetReservedOnly(reserved_only) => self.on_set_reserved_only(reserved_only),
379			Action::DisconnectPeer(peer_id) => self.on_disconnect_peer(peer_id),
380			Action::GetReservedPeers(pending_response) => {
381				self.on_get_reserved_peers(pending_response)
382			},
383		}
384	}
385
386	/// Send "accept" message to `Notifications`.
387	fn accept_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
388		trace!(
389			target: LOG_TARGET,
390			"Accepting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
391			self.set_id,
392			self.num_in,
393			self.max_in,
394		);
395
396		let _ = self.to_notifications.unbounded_send(Message::Accept(incoming_index));
397	}
398
399	/// Send "reject" message to `Notifications`.
400	fn reject_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
401		trace!(
402			target: LOG_TARGET,
403			"Rejecting {peer_id} ({incoming_index:?}) on {:?} ({}/{} num_in/max_in).",
404			self.set_id,
405			self.num_in,
406			self.max_in,
407		);
408
409		let _ = self.to_notifications.unbounded_send(Message::Reject(incoming_index));
410	}
411
412	/// Send "connect" message to `Notifications`.
413	fn start_connection(&mut self, peer_id: PeerId) {
414		trace!(
415			target: LOG_TARGET,
416			"Connecting to {peer_id} on {:?} ({}/{} num_out/max_out).",
417			self.set_id,
418			self.num_out,
419			self.max_out,
420		);
421
422		let _ = self
423			.to_notifications
424			.unbounded_send(Message::Connect { set_id: self.set_id, peer_id });
425	}
426
427	/// Send "drop" message to `Notifications`.
428	fn drop_connection(&mut self, peer_id: PeerId) {
429		trace!(
430			target: LOG_TARGET,
431			"Dropping {peer_id} on {:?} ({}/{} num_in/max_in, {}/{} num_out/max_out).",
432			self.set_id,
433			self.num_in,
434			self.max_in,
435			self.num_out,
436			self.max_out,
437		);
438
439		let _ = self
440			.to_notifications
441			.unbounded_send(Message::Drop { set_id: self.set_id, peer_id });
442	}
443
444	/// Report peer disconnect event to `PeerStore` for it to update peer's reputation accordingly.
445	/// Should only be called if the remote node disconnected us, not the other way around.
446	fn report_disconnect(&mut self, peer_id: PeerId) {
447		self.peer_store.report_disconnect(peer_id.into());
448	}
449
450	/// Ask `Peerset` if the peer has a reputation value not sufficient for connection with it.
451	fn is_banned(&self, peer_id: &PeerId) -> bool {
452		self.peer_store.is_banned(&peer_id.into())
453	}
454
455	/// Add the peer to the set of reserved peers. [`ProtocolController`] will try to always
456	/// maintain connections with such peers.
457	fn on_add_reserved_peer(&mut self, peer_id: PeerId) {
458		if self.reserved_nodes.contains_key(&peer_id) {
459			debug!(
460				target: LOG_TARGET,
461				"Trying to add an already reserved node {peer_id} as reserved on {:?}.",
462				self.set_id,
463			);
464			return;
465		}
466
467		// Get the peer out of non-reserved peers if it's there.
468		let state = match self.nodes.remove(&peer_id) {
469			Some(direction) => {
470				trace!(
471					target: LOG_TARGET,
472					"Marking previously connected node {} ({:?}) as reserved on {:?}.",
473					peer_id,
474					direction,
475					self.set_id
476				);
477				PeerState::Connected(direction)
478			},
479			None => {
480				trace!(target: LOG_TARGET, "Adding reserved node {peer_id} on {:?}.", self.set_id,);
481				PeerState::NotConnected
482			},
483		};
484
485		self.reserved_nodes.insert(peer_id, state.clone());
486
487		// Discount occupied slots or connect to the node.
488		match state {
489			PeerState::Connected(Direction::Inbound) => self.num_in -= 1,
490			PeerState::Connected(Direction::Outbound) => self.num_out -= 1,
491			PeerState::NotConnected => self.alloc_slots(),
492		}
493	}
494
495	/// Remove the peer from the set of reserved peers. The peer is either moved to the set of
496	/// regular nodes or disconnected.
497	fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
498		let state = match self.reserved_nodes.remove(&peer_id) {
499			Some(state) => state,
500			None => {
501				warn!(
502					target: LOG_TARGET,
503					"Trying to remove unknown reserved node {peer_id} from {:?}.", self.set_id,
504				);
505				return;
506			},
507		};
508
509		if let PeerState::Connected(direction) = state {
510			// Disconnect if we're at (or over) the regular node limit
511			let disconnect = self.reserved_only
512				|| match direction {
513					Direction::Inbound => self.num_in >= self.max_in,
514					Direction::Outbound => self.num_out >= self.max_out,
515				};
516
517			if disconnect {
518				// Disconnect the node.
519				trace!(
520					target: LOG_TARGET,
521					"Disconnecting previously reserved node {peer_id} ({direction:?}) on {:?}.",
522					self.set_id,
523				);
524				self.drop_connection(peer_id);
525			} else {
526				// Count connections as of regular node.
527				trace!(
528					target: LOG_TARGET,
529					"Making a connected reserved node {peer_id} ({:?}) on {:?} a regular one.",
530					direction,
531					self.set_id,
532				);
533
534				match direction {
535					Direction::Inbound => self.num_in += 1,
536					Direction::Outbound => self.num_out += 1,
537				}
538
539				// Put the node into the list of regular nodes.
540				let prev = self.nodes.insert(peer_id, direction);
541				assert!(prev.is_none(), "Corrupted state: reserved node was also non-reserved.");
542			}
543		} else {
544			trace!(
545				target: LOG_TARGET,
546				"Removed disconnected reserved node {peer_id} from {:?}.",
547				self.set_id,
548			);
549		}
550	}
551
552	/// Replace the set of reserved peers.
553	fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
554		// Determine the difference between the current group and the new list.
555		let current = self.reserved_nodes.keys().cloned().collect();
556		let to_insert = peer_ids.difference(&current).cloned().collect::<Vec<_>>();
557		let to_remove = current.difference(&peer_ids).cloned().collect::<Vec<_>>();
558
559		for node in to_insert {
560			self.on_add_reserved_peer(node);
561		}
562
563		for node in to_remove {
564			self.on_remove_reserved_peer(node);
565		}
566	}
567
568	/// Change "reserved only" flag. In "reserved only" mode we connect and accept connections to
569	/// reserved nodes only.
570	fn on_set_reserved_only(&mut self, reserved_only: bool) {
571		trace!(target: LOG_TARGET, "Set reserved only to `{reserved_only}` on {:?}", self.set_id);
572
573		self.reserved_only = reserved_only;
574
575		if !reserved_only {
576			return self.alloc_slots();
577		}
578
579		// Disconnect all non-reserved peers.
580		self.nodes
581			.iter()
582			.map(|(k, v)| (*k, *v))
583			.collect::<Vec<(_, _)>>()
584			.iter()
585			.for_each(|(peer_id, direction)| {
586				// Update counters in the loop for `drop_connection` to report the correct number.
587				match direction {
588					Direction::Inbound => self.num_in -= 1,
589					Direction::Outbound => self.num_out -= 1,
590				}
591				self.drop_connection(*peer_id)
592			});
593		self.nodes.clear();
594	}
595
596	/// Get the list of reserved peers.
597	fn on_get_reserved_peers(&self, pending_response: oneshot::Sender<Vec<PeerId>>) {
598		let _ = pending_response.send(self.reserved_nodes.keys().cloned().collect());
599	}
600
601	/// Disconnect the peer.
602	fn on_disconnect_peer(&mut self, peer_id: PeerId) {
603		// Don't do anything if the node is reserved.
604		if self.reserved_nodes.contains_key(&peer_id) {
605			debug!(
606				target: LOG_TARGET,
607				"Ignoring request to disconnect reserved peer {peer_id} from {:?}.", self.set_id,
608			);
609			return;
610		}
611
612		match self.nodes.remove(&peer_id) {
613			Some(direction) => {
614				trace!(
615					target: LOG_TARGET,
616					"Disconnecting peer {peer_id} ({direction:?}) from {:?}.",
617					self.set_id
618				);
619				match direction {
620					Direction::Inbound => self.num_in -= 1,
621					Direction::Outbound => self.num_out -= 1,
622				}
623				self.drop_connection(peer_id);
624			},
625			None => {
626				debug!(
627					target: LOG_TARGET,
628					"Trying to disconnect unknown peer {peer_id} from {:?}.", self.set_id,
629				);
630			},
631		}
632	}
633
634	/// Indicate that we received an incoming connection. Must be answered either with
635	/// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
636	///
637	/// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming
638	/// connection implicitly means `Connect`, but incoming connections aren't cancelled by
639	/// `dropped`.
640	// Implementation note: because of concurrency issues, `ProtocolController` has an imperfect
641	// view of the peers' states, and may issue commands for a peer after `Notifications` received
642	// an incoming request for that peer. In this case, `Notifications` ignores all the commands
643	// until it receives a response for the incoming request to `ProtocolController`, so we must
644	// ensure we handle this incoming request correctly.
645	fn on_incoming_connection(&mut self, peer_id: PeerId, incoming_index: IncomingIndex) {
646		trace!(
647			target: LOG_TARGET,
648			"Incoming connection from peer {peer_id} ({incoming_index:?}) on {:?}.",
649			self.set_id,
650		);
651
652		if self.reserved_only && !self.reserved_nodes.contains_key(&peer_id) {
653			self.reject_connection(peer_id, incoming_index);
654			return;
655		}
656
657		// Check if the node is reserved first.
658		if let Some(state) = self.reserved_nodes.get_mut(&peer_id) {
659			match state {
660				PeerState::Connected(ref mut direction) => {
661					// We are accepting an incoming connection, so ensure the direction is inbound.
662					// (See the implementation note above.)
663					*direction = Direction::Inbound;
664					self.accept_connection(peer_id, incoming_index);
665				},
666				PeerState::NotConnected => {
667					if self.peer_store.is_banned(&peer_id.into()) {
668						self.reject_connection(peer_id, incoming_index);
669					} else {
670						*state = PeerState::Connected(Direction::Inbound);
671						self.accept_connection(peer_id, incoming_index);
672					}
673				},
674			}
675			return;
676		}
677
678		// If we're already connected, pretend we are not connected and decide on the node again.
679		// (See the note above.)
680		if let Some(direction) = self.nodes.remove(&peer_id) {
681			trace!(
682				target: LOG_TARGET,
683				"Handling incoming connection from peer {} we think we already connected as {:?} on {:?}.",
684				peer_id,
685				direction,
686				self.set_id
687			);
688			match direction {
689				Direction::Inbound => self.num_in -= 1,
690				Direction::Outbound => self.num_out -= 1,
691			}
692		}
693
694		if self.num_in >= self.max_in {
695			self.reject_connection(peer_id, incoming_index);
696			return;
697		}
698
699		if self.is_banned(&peer_id) {
700			self.reject_connection(peer_id, incoming_index);
701			return;
702		}
703
704		self.num_in += 1;
705		self.nodes.insert(peer_id, Direction::Inbound);
706		self.accept_connection(peer_id, incoming_index);
707	}
708
709	/// Indicate that a connection with the peer was dropped.
710	fn on_peer_dropped(&mut self, peer_id: PeerId) {
711		self.on_peer_dropped_inner(peer_id).unwrap_or_else(|peer_id| {
712			// We do not assert here, because due to asynchronous nature of communication
713			// between `ProtocolController` and `Notifications` we can receive `Action::Dropped`
714			// for a peer we already disconnected ourself.
715			trace!(
716				target: LOG_TARGET,
717				"Received `Action::Dropped` for not connected peer {peer_id} on {:?}.",
718				self.set_id,
719			)
720		});
721	}
722
723	/// Indicate that a connection with the peer was dropped.
724	/// Returns `Err(PeerId)` if the peer wasn't connected or is not known to us.
725	fn on_peer_dropped_inner(&mut self, peer_id: PeerId) -> Result<(), PeerId> {
726		if self.drop_reserved_peer(&peer_id)? || self.drop_regular_peer(&peer_id) {
727			// The peer found and disconnected.
728			self.report_disconnect(peer_id);
729			Ok(())
730		} else {
731			// The peer was not found in neither regular or reserved lists.
732			Err(peer_id)
733		}
734	}
735
736	/// Try dropping the peer as a reserved peer. Return `Ok(true)` if the peer was found and
737	/// disconnected, `Ok(false)` if it wasn't found, `Err(PeerId)`, if the peer found, but not in
738	/// connected state.
739	fn drop_reserved_peer(&mut self, peer_id: &PeerId) -> Result<bool, PeerId> {
740		let Some(state) = self.reserved_nodes.get_mut(peer_id) else { return Ok(false) };
741
742		if let PeerState::Connected(direction) = state {
743			trace!(
744				target: LOG_TARGET,
745				"Reserved peer {peer_id} ({direction:?}) dropped from {:?}.",
746				self.set_id,
747			);
748			*state = PeerState::NotConnected;
749			Ok(true)
750		} else {
751			Err(*peer_id)
752		}
753	}
754
755	/// Try dropping the peer as a regular peer. Return `true` if the peer was found and
756	/// disconnected, `false` if it wasn't found.
757	fn drop_regular_peer(&mut self, peer_id: &PeerId) -> bool {
758		let Some(direction) = self.nodes.remove(peer_id) else { return false };
759
760		trace!(
761			target: LOG_TARGET,
762			"Peer {peer_id} ({direction:?}) dropped from {:?}.",
763			self.set_id,
764		);
765
766		match direction {
767			Direction::Inbound => self.num_in -= 1,
768			Direction::Outbound => self.num_out -= 1,
769		}
770
771		true
772	}
773
774	/// Initiate outgoing connections trying to connect all reserved nodes and fill in all outgoing
775	/// slots.
776	fn alloc_slots(&mut self) {
777		// Try connecting to reserved nodes first, ignoring nodes with outstanding events/actions.
778		self.reserved_nodes
779			.iter_mut()
780			.filter_map(|(peer_id, state)| {
781				(!state.is_connected() && !self.peer_store.is_banned(&peer_id.into())).then(|| {
782					*state = PeerState::Connected(Direction::Outbound);
783					peer_id
784				})
785			})
786			.cloned()
787			.collect::<Vec<_>>()
788			.into_iter()
789			.for_each(|peer_id| {
790				self.start_connection(peer_id);
791			});
792
793		// Nothing more to do if we're in reserved-only mode or don't have slots available.
794		if self.reserved_only || self.num_out >= self.max_out {
795			return;
796		}
797
798		// Fill available slots.
799		let available_slots = (self.max_out - self.num_out).saturated_into();
800
801		// Ignore reserved nodes (connected above), already connected nodes, and nodes with
802		// outstanding events/actions.
803		let ignored = self
804			.reserved_nodes
805			.keys()
806			.map(From::from)
807			.collect::<HashSet<crate::types::PeerId>>()
808			.union(&self.nodes.keys().map(From::from).collect::<HashSet<crate::types::PeerId>>())
809			.cloned()
810			.collect();
811
812		let candidates = self
813			.peer_store
814			.outgoing_candidates(available_slots, ignored)
815			.into_iter()
816			.filter_map(|peer_id| {
817				(!self.reserved_nodes.contains_key(&peer_id.into())
818					&& !self.nodes.contains_key(&peer_id.into()))
819				.then_some(peer_id)
820				.or_else(|| {
821					error!(
822						target: LOG_TARGET,
823						"`PeerStore` returned a node we asked to ignore: {peer_id}.",
824					);
825					debug_assert!(false, "`PeerStore` returned a node we asked to ignore.");
826					None
827				})
828			})
829			.collect::<Vec<_>>();
830
831		if candidates.len() > available_slots {
832			error!(
833				target: LOG_TARGET,
834				"`PeerStore` returned more nodes than there are slots available.",
835			);
836			debug_assert!(false, "`PeerStore` returned more nodes than there are slots available.");
837		}
838
839		candidates.into_iter().take(available_slots).for_each(|peer_id| {
840			self.num_out += 1;
841			self.nodes.insert(peer_id.into(), Direction::Outbound);
842			self.start_connection(peer_id.into());
843		})
844	}
845}
846
847#[cfg(test)]
848mod tests {
849	use super::*;
850	use crate::common::role::ObservedRole;
851	use crate::{
852		peer_store::{PeerStoreProvider, ProtocolHandle as ProtocolHandleT},
853		ReputationChange,
854	};
855	use libp2p::PeerId;
856	use soil_client::utils::mpsc::{tracing_unbounded, TryRecvError};
857	use std::collections::HashSet;
858
859	mockall::mock! {
860		#[derive(Debug)]
861		pub PeerStoreHandle {}
862
863		impl PeerStoreProvider for PeerStoreHandle {
864			fn is_banned(&self, peer_id: &crate::types::PeerId) -> bool;
865			fn register_protocol(&self, protocol_handle: Arc<dyn ProtocolHandleT>);
866			fn report_disconnect(&self, peer_id: crate::types::PeerId);
867			fn set_peer_role(&self, peer_id: &crate::types::PeerId, role: ObservedRole);
868			fn report_peer(&self, peer_id: crate::types::PeerId, change: ReputationChange);
869			fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32;
870			fn peer_role(&self, peer_id: &crate::types::PeerId) -> Option<ObservedRole>;
871			fn outgoing_candidates(&self, count: usize, ignored: HashSet<crate::types::PeerId>) -> Vec<crate::types::PeerId>;
872			fn add_known_peer(&self, peer_id: crate::types::PeerId);
873		}
874	}
875
876	#[test]
877	fn reserved_nodes_are_connected_dropped_and_accepted() {
878		let reserved1 = PeerId::random();
879		let reserved2 = PeerId::random();
880
881		// Add first reserved node via config.
882		let config = ProtoSetConfig {
883			in_peers: 0,
884			out_peers: 0,
885			reserved_nodes: std::iter::once(reserved1).collect(),
886			reserved_only: true,
887		};
888		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
889
890		let mut peer_store = MockPeerStoreHandle::new();
891		peer_store.expect_register_protocol().once().return_const(());
892		peer_store.expect_is_banned().times(4).return_const(false);
893		peer_store.expect_report_disconnect().times(2).return_const(());
894
895		let (_handle, mut controller) =
896			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
897
898		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
899		controller.on_add_reserved_peer(reserved2);
900
901		// Initiate connections (currently, `alloc_slots` is also called internally in
902		// `on_add_reserved_peer` above).
903		controller.alloc_slots();
904
905		let mut messages = Vec::new();
906		while let Some(message) = rx.try_recv().ok() {
907			messages.push(message);
908		}
909		assert_eq!(messages.len(), 2);
910		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
911		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
912
913		// Reserved peers do not occupy slots.
914		assert_eq!(controller.num_out, 0);
915		assert_eq!(controller.num_in, 0);
916
917		// Drop connections to be able to accept reserved nodes.
918		controller.on_peer_dropped(reserved1);
919		controller.on_peer_dropped(reserved2);
920
921		// Incoming connection from `reserved1`.
922		let incoming1 = IncomingIndex(1);
923		controller.on_incoming_connection(reserved1, incoming1);
924		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming1));
925		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
926
927		// Incoming connection from `reserved2`.
928		let incoming2 = IncomingIndex(2);
929		controller.on_incoming_connection(reserved2, incoming2);
930		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming2));
931		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
932
933		// Reserved peers do not occupy slots.
934		assert_eq!(controller.num_out, 0);
935		assert_eq!(controller.num_in, 0);
936	}
937
938	#[test]
939	fn banned_reserved_nodes_are_not_connected_and_not_accepted() {
940		let reserved1 = PeerId::random();
941		let reserved2 = PeerId::random();
942
943		// Add first reserved node via config.
944		let config = ProtoSetConfig {
945			in_peers: 0,
946			out_peers: 0,
947			reserved_nodes: std::iter::once(reserved1).collect(),
948			reserved_only: true,
949		};
950		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
951
952		let mut peer_store = MockPeerStoreHandle::new();
953		peer_store.expect_register_protocol().once().return_const(());
954		peer_store.expect_is_banned().times(6).return_const(true);
955
956		let (_handle, mut controller) =
957			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
958
959		// Add second reserved node at runtime (this currently calls `alloc_slots` internally).
960		controller.on_add_reserved_peer(reserved2);
961
962		// Initiate connections.
963		controller.alloc_slots();
964
965		// No slots occupied.
966		assert_eq!(controller.num_out, 0);
967		assert_eq!(controller.num_in, 0);
968
969		// No commands are generated.
970		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
971
972		// Incoming connection from `reserved1`.
973		let incoming1 = IncomingIndex(1);
974		controller.on_incoming_connection(reserved1, incoming1);
975		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming1));
976		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
977
978		// Incoming connection from `reserved2`.
979		let incoming2 = IncomingIndex(2);
980		controller.on_incoming_connection(reserved2, incoming2);
981		assert_eq!(rx.try_recv().unwrap(), Message::Reject(incoming2));
982		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
983
984		// No slots occupied.
985		assert_eq!(controller.num_out, 0);
986		assert_eq!(controller.num_in, 0);
987	}
988
989	#[test]
990	fn we_try_to_reconnect_to_dropped_reserved_nodes() {
991		let reserved1 = PeerId::random();
992		let reserved2 = PeerId::random();
993
994		// Add first reserved node via config.
995		let config = ProtoSetConfig {
996			in_peers: 0,
997			out_peers: 0,
998			reserved_nodes: std::iter::once(reserved1).collect(),
999			reserved_only: true,
1000		};
1001		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1002
1003		let mut peer_store = MockPeerStoreHandle::new();
1004		peer_store.expect_register_protocol().once().return_const(());
1005		peer_store.expect_is_banned().times(4).return_const(false);
1006		peer_store.expect_report_disconnect().times(2).return_const(());
1007
1008		let (_handle, mut controller) =
1009			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1010
1011		// Add second reserved node at runtime (this calls `alloc_slots` internally).
1012		controller.on_add_reserved_peer(reserved2);
1013
1014		// Initiate connections (actually redundant, see previous comment).
1015		controller.alloc_slots();
1016
1017		let mut messages = Vec::new();
1018		while let Some(message) = rx.try_recv().ok() {
1019			messages.push(message);
1020		}
1021
1022		assert_eq!(messages.len(), 2);
1023		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1024		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1025
1026		// Drop both reserved nodes.
1027		controller.on_peer_dropped(reserved1);
1028		controller.on_peer_dropped(reserved2);
1029
1030		// Initiate connections.
1031		controller.alloc_slots();
1032
1033		let mut messages = Vec::new();
1034		while let Some(message) = rx.try_recv().ok() {
1035			messages.push(message);
1036		}
1037
1038		assert_eq!(messages.len(), 2);
1039		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1040		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1041
1042		// No slots occupied.
1043		assert_eq!(controller.num_out, 0);
1044		assert_eq!(controller.num_in, 0);
1045	}
1046
1047	#[test]
1048	fn nodes_supplied_by_peer_store_are_connected() {
1049		let peer1 = PeerId::random();
1050		let peer2 = PeerId::random();
1051		let candidates = vec![peer1.into(), peer2.into()];
1052
1053		let config = ProtoSetConfig {
1054			in_peers: 0,
1055			// Less slots than candidates.
1056			out_peers: 2,
1057			reserved_nodes: HashSet::new(),
1058			reserved_only: false,
1059		};
1060		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1061
1062		let mut peer_store = MockPeerStoreHandle::new();
1063		peer_store.expect_register_protocol().once().return_const(());
1064		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1065
1066		let (_handle, mut controller) =
1067			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1068
1069		// Initiate connections.
1070		controller.alloc_slots();
1071
1072		let mut messages = Vec::new();
1073		while let Some(message) = rx.try_recv().ok() {
1074			messages.push(message);
1075		}
1076
1077		// Only first two peers are connected (we only have 2 slots).
1078		assert_eq!(messages.len(), 2);
1079		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1080		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1081
1082		// Outgoing slots occupied.
1083		assert_eq!(controller.num_out, 2);
1084		assert_eq!(controller.num_in, 0);
1085
1086		// No more nodes are connected.
1087		controller.alloc_slots();
1088		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1089
1090		// No more slots occupied.
1091		assert_eq!(controller.num_out, 2);
1092		assert_eq!(controller.num_in, 0);
1093	}
1094
1095	#[test]
1096	fn both_reserved_nodes_and_nodes_supplied_by_peer_store_are_connected() {
1097		let reserved1 = PeerId::random();
1098		let reserved2 = PeerId::random();
1099		let regular1 = PeerId::random();
1100		let regular2 = PeerId::random();
1101		let outgoing_candidates = vec![regular1.into(), regular2.into()];
1102		let reserved_nodes = [reserved1, reserved2].iter().cloned().collect();
1103
1104		let config =
1105			ProtoSetConfig { in_peers: 10, out_peers: 10, reserved_nodes, reserved_only: false };
1106		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1107
1108		let mut peer_store = MockPeerStoreHandle::new();
1109		peer_store.expect_register_protocol().once().return_const(());
1110		peer_store.expect_is_banned().times(2).return_const(false);
1111		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1112
1113		let (_handle, mut controller) =
1114			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1115
1116		// Initiate connections.
1117		controller.alloc_slots();
1118
1119		let mut messages = Vec::new();
1120		while let Some(message) = rx.try_recv().ok() {
1121			messages.push(message);
1122		}
1123		assert_eq!(messages.len(), 4);
1124		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1125		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1126		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1127		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular2 }));
1128		assert_eq!(controller.num_out, 2);
1129		assert_eq!(controller.num_in, 0);
1130	}
1131
1132	#[test]
1133	fn if_slots_are_freed_we_try_to_allocate_them_again() {
1134		let peer1 = PeerId::random();
1135		let peer2 = PeerId::random();
1136		let peer3 = PeerId::random();
1137		let candidates1 = vec![peer1.into(), peer2.into()];
1138		let candidates2 = vec![peer3.into()];
1139
1140		let config = ProtoSetConfig {
1141			in_peers: 0,
1142			// Less slots than candidates.
1143			out_peers: 2,
1144			reserved_nodes: HashSet::new(),
1145			reserved_only: false,
1146		};
1147		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1148
1149		let mut peer_store = MockPeerStoreHandle::new();
1150		peer_store.expect_register_protocol().once().return_const(());
1151		peer_store.expect_outgoing_candidates().once().return_const(candidates1);
1152		peer_store.expect_outgoing_candidates().once().return_const(candidates2);
1153		peer_store.expect_report_disconnect().times(2).return_const(());
1154
1155		let (_handle, mut controller) =
1156			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1157
1158		// Initiate connections.
1159		controller.alloc_slots();
1160
1161		let mut messages = Vec::new();
1162		while let Some(message) = rx.try_recv().ok() {
1163			messages.push(message);
1164		}
1165
1166		// Only first two peers are connected (we only have 2 slots).
1167		assert_eq!(messages.len(), 2);
1168		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1169		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1170
1171		// Outgoing slots occupied.
1172		assert_eq!(controller.num_out, 2);
1173		assert_eq!(controller.num_in, 0);
1174
1175		// No more nodes are connected.
1176		controller.alloc_slots();
1177		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1178
1179		// No more slots occupied.
1180		assert_eq!(controller.num_out, 2);
1181		assert_eq!(controller.num_in, 0);
1182
1183		// Drop peers.
1184		controller.on_peer_dropped(peer1);
1185		controller.on_peer_dropped(peer2);
1186
1187		// Slots are freed.
1188		assert_eq!(controller.num_out, 0);
1189		assert_eq!(controller.num_in, 0);
1190
1191		// Initiate connections.
1192		controller.alloc_slots();
1193
1194		let mut messages = Vec::new();
1195		while let Some(message) = rx.try_recv().ok() {
1196			messages.push(message);
1197		}
1198
1199		// Peers are connected.
1200		assert_eq!(messages.len(), 1);
1201		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer3 }));
1202
1203		// Outgoing slots occupied.
1204		assert_eq!(controller.num_out, 1);
1205		assert_eq!(controller.num_in, 0);
1206	}
1207
1208	#[test]
1209	fn in_reserved_only_mode_no_peers_are_requested_from_peer_store_and_connected() {
1210		let config = ProtoSetConfig {
1211			in_peers: 0,
1212			// Make sure we have slots available.
1213			out_peers: 2,
1214			reserved_nodes: HashSet::new(),
1215			reserved_only: true,
1216		};
1217		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1218
1219		let mut peer_store = MockPeerStoreHandle::new();
1220		peer_store.expect_register_protocol().once().return_const(());
1221
1222		let (_handle, mut controller) =
1223			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1224
1225		// Initiate connections.
1226		controller.alloc_slots();
1227
1228		// No nodes are connected.
1229		assert_eq!(controller.num_out, 0);
1230		assert_eq!(controller.num_in, 0);
1231		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1232	}
1233
1234	#[test]
1235	fn in_reserved_only_mode_no_regular_peers_are_accepted() {
1236		let config = ProtoSetConfig {
1237			// Make sure we have slots available.
1238			in_peers: 2,
1239			out_peers: 0,
1240			reserved_nodes: HashSet::new(),
1241			reserved_only: true,
1242		};
1243		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1244
1245		let mut peer_store = MockPeerStoreHandle::new();
1246		peer_store.expect_register_protocol().once().return_const(());
1247
1248		let (_handle, mut controller) =
1249			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1250
1251		let peer = PeerId::random();
1252		let incoming_index = IncomingIndex(1);
1253		controller.on_incoming_connection(peer, incoming_index);
1254
1255		let mut messages = Vec::new();
1256		while let Some(message) = rx.try_recv().ok() {
1257			messages.push(message);
1258		}
1259
1260		// Peer is rejected.
1261		assert_eq!(messages.len(), 1);
1262		assert!(messages.contains(&Message::Reject(incoming_index)));
1263		assert_eq!(controller.num_out, 0);
1264		assert_eq!(controller.num_in, 0);
1265	}
1266
1267	#[test]
1268	fn disabling_reserved_only_mode_allows_to_connect_to_peers() {
1269		let peer1 = PeerId::random();
1270		let peer2 = PeerId::random();
1271		let candidates = vec![peer1.into(), peer2.into()];
1272
1273		let config = ProtoSetConfig {
1274			in_peers: 0,
1275			// Make sure we have slots available.
1276			out_peers: 10,
1277			reserved_nodes: HashSet::new(),
1278			reserved_only: true,
1279		};
1280		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1281
1282		let mut peer_store = MockPeerStoreHandle::new();
1283		peer_store.expect_register_protocol().once().return_const(());
1284		peer_store.expect_outgoing_candidates().once().return_const(candidates);
1285
1286		let (_handle, mut controller) =
1287			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1288
1289		// Initiate connections.
1290		controller.alloc_slots();
1291
1292		// No nodes are connected.
1293		assert_eq!(controller.num_out, 0);
1294		assert_eq!(controller.num_in, 0);
1295		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1296
1297		// Disable reserved-only mode (this also connects to peers).
1298		controller.on_set_reserved_only(false);
1299
1300		let mut messages = Vec::new();
1301		while let Some(message) = rx.try_recv().ok() {
1302			messages.push(message);
1303		}
1304
1305		assert_eq!(messages.len(), 2);
1306		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1307		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1308		assert_eq!(controller.num_out, 2);
1309		assert_eq!(controller.num_in, 0);
1310	}
1311
1312	#[test]
1313	fn enabling_reserved_only_mode_disconnects_regular_peers() {
1314		let reserved1 = PeerId::random();
1315		let reserved2 = PeerId::random();
1316		let regular1 = PeerId::random();
1317		let regular2 = PeerId::random();
1318		let outgoing_candidates = vec![regular1.into()];
1319
1320		let config = ProtoSetConfig {
1321			in_peers: 10,
1322			out_peers: 10,
1323			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1324			reserved_only: false,
1325		};
1326		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1327
1328		let mut peer_store = MockPeerStoreHandle::new();
1329		peer_store.expect_register_protocol().once().return_const(());
1330		peer_store.expect_is_banned().times(3).return_const(false);
1331		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1332
1333		let (_handle, mut controller) =
1334			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1335		assert_eq!(controller.num_out, 0);
1336		assert_eq!(controller.num_in, 0);
1337
1338		// Connect `regular1` as outbound.
1339		controller.alloc_slots();
1340
1341		let mut messages = Vec::new();
1342		while let Some(message) = rx.try_recv().ok() {
1343			messages.push(message);
1344		}
1345		assert_eq!(messages.len(), 3);
1346		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1347		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1348		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: regular1 }));
1349		assert_eq!(controller.num_out, 1);
1350		assert_eq!(controller.num_in, 0);
1351
1352		// Connect `regular2` as inbound.
1353		let incoming_index = IncomingIndex(1);
1354		controller.on_incoming_connection(regular2, incoming_index);
1355		assert_eq!(rx.try_recv().unwrap(), Message::Accept(incoming_index));
1356		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1357		assert_eq!(controller.num_out, 1);
1358		assert_eq!(controller.num_in, 1);
1359
1360		// Switch to reserved-only mode.
1361		controller.on_set_reserved_only(true);
1362
1363		let mut messages = Vec::new();
1364		while let Some(message) = rx.try_recv().ok() {
1365			messages.push(message);
1366		}
1367		assert_eq!(messages.len(), 2);
1368		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular1 }));
1369		assert!(messages.contains(&Message::Drop { set_id: SetId::from(0), peer_id: regular2 }));
1370		assert_eq!(controller.nodes.len(), 0);
1371		assert_eq!(controller.num_out, 0);
1372		assert_eq!(controller.num_in, 0);
1373	}
1374
1375	#[test]
1376	fn removed_disconnected_reserved_node_is_forgotten() {
1377		let reserved1 = PeerId::random();
1378		let reserved2 = PeerId::random();
1379
1380		let config = ProtoSetConfig {
1381			in_peers: 10,
1382			out_peers: 10,
1383			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1384			reserved_only: false,
1385		};
1386		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1387
1388		let mut peer_store = MockPeerStoreHandle::new();
1389		peer_store.expect_register_protocol().once().return_const(());
1390
1391		let (_handle, mut controller) =
1392			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1393		assert_eq!(controller.reserved_nodes.len(), 2);
1394		assert_eq!(controller.nodes.len(), 0);
1395		assert_eq!(controller.num_out, 0);
1396		assert_eq!(controller.num_in, 0);
1397
1398		controller.on_remove_reserved_peer(reserved1);
1399		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1400		assert_eq!(controller.reserved_nodes.len(), 1);
1401		assert!(!controller.reserved_nodes.contains_key(&reserved1));
1402		assert_eq!(controller.nodes.len(), 0);
1403		assert_eq!(controller.num_out, 0);
1404		assert_eq!(controller.num_in, 0);
1405	}
1406
1407	#[test]
1408	fn removed_connected_reserved_node_is_disconnected_in_reserved_only_mode() {
1409		let reserved1 = PeerId::random();
1410		let reserved2 = PeerId::random();
1411
1412		let config = ProtoSetConfig {
1413			in_peers: 10,
1414			out_peers: 10,
1415			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1416			reserved_only: true,
1417		};
1418		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1419
1420		let mut peer_store = MockPeerStoreHandle::new();
1421		peer_store.expect_register_protocol().once().return_const(());
1422		peer_store.expect_is_banned().times(2).return_const(false);
1423
1424		let (_handle, mut controller) =
1425			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1426
1427		// Initiate connections.
1428		controller.alloc_slots();
1429		let mut messages = Vec::new();
1430		while let Some(message) = rx.try_recv().ok() {
1431			messages.push(message);
1432		}
1433		assert_eq!(messages.len(), 2);
1434		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved1 }));
1435		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1436		assert_eq!(controller.reserved_nodes.len(), 2);
1437		assert!(controller.reserved_nodes.contains_key(&reserved1));
1438		assert!(controller.reserved_nodes.contains_key(&reserved2));
1439		assert!(controller.nodes.is_empty());
1440
1441		// Remove reserved node
1442		controller.on_remove_reserved_peer(reserved1);
1443		assert_eq!(
1444			rx.try_recv().unwrap(),
1445			Message::Drop { set_id: SetId::from(0), peer_id: reserved1 }
1446		);
1447		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1448		assert_eq!(controller.reserved_nodes.len(), 1);
1449		assert!(controller.reserved_nodes.contains_key(&reserved2));
1450		assert!(controller.nodes.is_empty());
1451	}
1452
1453	#[test]
1454	fn removed_connected_reserved_nodes_become_regular_in_non_reserved_mode() {
1455		let peer1 = PeerId::random();
1456		let peer2 = PeerId::random();
1457
1458		let config = ProtoSetConfig {
1459			in_peers: 10,
1460			out_peers: 10,
1461			reserved_nodes: [peer1, peer2].iter().cloned().collect(),
1462			reserved_only: false,
1463		};
1464		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1465
1466		let mut peer_store = MockPeerStoreHandle::new();
1467		peer_store.expect_register_protocol().once().return_const(());
1468		peer_store.expect_is_banned().times(2).return_const(false);
1469		peer_store
1470			.expect_outgoing_candidates()
1471			.once()
1472			.return_const(Vec::<crate::types::PeerId>::new());
1473
1474		let (_handle, mut controller) =
1475			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1476
1477		// Connect `peer1` as inbound, `peer2` as outbound.
1478		controller.on_incoming_connection(peer1, IncomingIndex(1));
1479		controller.alloc_slots();
1480		let mut messages = Vec::new();
1481		while let Some(message) = rx.try_recv().ok() {
1482			messages.push(message);
1483		}
1484		assert_eq!(messages.len(), 2);
1485		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1486		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer2 }));
1487		assert_eq!(controller.num_out, 0);
1488		assert_eq!(controller.num_in, 0);
1489
1490		// Remove reserved nodes (and make them regular)
1491		controller.on_remove_reserved_peer(peer1);
1492		controller.on_remove_reserved_peer(peer2);
1493		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1494		assert_eq!(controller.nodes.len(), 2);
1495		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Inbound)));
1496		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Outbound)));
1497		assert_eq!(controller.num_out, 1);
1498		assert_eq!(controller.num_in, 1);
1499	}
1500
1501	#[test]
1502	fn regular_nodes_stop_occupying_slots_when_become_reserved() {
1503		let peer1 = PeerId::random();
1504		let peer2 = PeerId::random();
1505		let outgoing_candidates = vec![peer1.into()];
1506
1507		let config = ProtoSetConfig {
1508			in_peers: 10,
1509			out_peers: 10,
1510			reserved_nodes: HashSet::new(),
1511			reserved_only: false,
1512		};
1513		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1514
1515		let mut peer_store = MockPeerStoreHandle::new();
1516		peer_store.expect_register_protocol().once().return_const(());
1517		peer_store.expect_is_banned().once().return_const(false);
1518		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1519
1520		let (_handle, mut controller) =
1521			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1522
1523		// Connect `peer1` as outbound & `peer2` as inbound.
1524		controller.alloc_slots();
1525		controller.on_incoming_connection(peer2, IncomingIndex(1));
1526		let mut messages = Vec::new();
1527		while let Some(message) = rx.try_recv().ok() {
1528			messages.push(message);
1529		}
1530		assert_eq!(messages.len(), 2);
1531		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1532		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1533		assert_eq!(controller.num_in, 1);
1534		assert_eq!(controller.num_out, 1);
1535
1536		controller.on_add_reserved_peer(peer1);
1537		controller.on_add_reserved_peer(peer2);
1538		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1539		assert_eq!(controller.num_in, 0);
1540		assert_eq!(controller.num_out, 0);
1541	}
1542
1543	#[test]
1544	fn disconnecting_regular_peers_work() {
1545		let peer1 = PeerId::random();
1546		let peer2 = PeerId::random();
1547		let outgoing_candidates = vec![peer1.into()];
1548
1549		let config = ProtoSetConfig {
1550			in_peers: 10,
1551			out_peers: 10,
1552			reserved_nodes: HashSet::new(),
1553			reserved_only: false,
1554		};
1555		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1556
1557		let mut peer_store = MockPeerStoreHandle::new();
1558		peer_store.expect_register_protocol().once().return_const(());
1559		peer_store.expect_is_banned().once().return_const(false);
1560		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1561
1562		let (_handle, mut controller) =
1563			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1564
1565		// Connect `peer1` as outbound & `peer2` as inbound.
1566		controller.alloc_slots();
1567		controller.on_incoming_connection(peer2, IncomingIndex(1));
1568		let mut messages = Vec::new();
1569		while let Some(message) = rx.try_recv().ok() {
1570			messages.push(message);
1571		}
1572		assert_eq!(messages.len(), 2);
1573		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1574		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1575		assert_eq!(controller.nodes.len(), 2);
1576		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1577		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1578		assert_eq!(controller.num_in, 1);
1579		assert_eq!(controller.num_out, 1);
1580
1581		controller.on_disconnect_peer(peer1);
1582		assert_eq!(
1583			rx.try_recv().unwrap(),
1584			Message::Drop { set_id: SetId::from(0), peer_id: peer1 }
1585		);
1586		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1587		assert_eq!(controller.nodes.len(), 1);
1588		assert!(!controller.nodes.contains_key(&peer1));
1589		assert_eq!(controller.num_in, 1);
1590		assert_eq!(controller.num_out, 0);
1591
1592		controller.on_disconnect_peer(peer2);
1593		assert_eq!(
1594			rx.try_recv().unwrap(),
1595			Message::Drop { set_id: SetId::from(0), peer_id: peer2 }
1596		);
1597		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1598		assert_eq!(controller.nodes.len(), 0);
1599		assert_eq!(controller.num_in, 0);
1600		assert_eq!(controller.num_out, 0);
1601	}
1602
1603	#[test]
1604	fn disconnecting_reserved_peers_is_a_noop() {
1605		let reserved1 = PeerId::random();
1606		let reserved2 = PeerId::random();
1607
1608		let config = ProtoSetConfig {
1609			in_peers: 10,
1610			out_peers: 10,
1611			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1612			reserved_only: false,
1613		};
1614		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1615
1616		let mut peer_store = MockPeerStoreHandle::new();
1617		peer_store.expect_register_protocol().once().return_const(());
1618		peer_store.expect_is_banned().times(2).return_const(false);
1619		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1620
1621		let (_handle, mut controller) =
1622			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1623
1624		// Connect `reserved1` as inbound & `reserved2` as outbound.
1625		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1626		controller.alloc_slots();
1627		let mut messages = Vec::new();
1628		while let Some(message) = rx.try_recv().ok() {
1629			messages.push(message);
1630		}
1631		assert_eq!(messages.len(), 2);
1632		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1633		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1634		assert!(matches!(
1635			controller.reserved_nodes.get(&reserved1),
1636			Some(PeerState::Connected(Direction::Inbound))
1637		));
1638		assert!(matches!(
1639			controller.reserved_nodes.get(&reserved2),
1640			Some(PeerState::Connected(Direction::Outbound))
1641		));
1642
1643		controller.on_disconnect_peer(reserved1);
1644		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1645		assert!(matches!(
1646			controller.reserved_nodes.get(&reserved1),
1647			Some(PeerState::Connected(Direction::Inbound))
1648		));
1649
1650		controller.on_disconnect_peer(reserved2);
1651		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1652		assert!(matches!(
1653			controller.reserved_nodes.get(&reserved2),
1654			Some(PeerState::Connected(Direction::Outbound))
1655		));
1656	}
1657
1658	#[test]
1659	fn dropping_regular_peers_work() {
1660		let peer1 = PeerId::random();
1661		let peer2 = PeerId::random();
1662		let outgoing_candidates = vec![peer1.into()];
1663
1664		let config = ProtoSetConfig {
1665			in_peers: 10,
1666			out_peers: 10,
1667			reserved_nodes: HashSet::new(),
1668			reserved_only: false,
1669		};
1670		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1671
1672		let mut peer_store = MockPeerStoreHandle::new();
1673		peer_store.expect_register_protocol().once().return_const(());
1674		peer_store.expect_is_banned().once().return_const(false);
1675		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1676		peer_store.expect_report_disconnect().times(2).return_const(());
1677
1678		let (_handle, mut controller) =
1679			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1680
1681		// Connect `peer1` as outbound & `peer2` as inbound.
1682		controller.alloc_slots();
1683		controller.on_incoming_connection(peer2, IncomingIndex(1));
1684		let mut messages = Vec::new();
1685		while let Some(message) = rx.try_recv().ok() {
1686			messages.push(message);
1687		}
1688		assert_eq!(messages.len(), 2);
1689		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: peer1 }));
1690		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1691		assert_eq!(controller.nodes.len(), 2);
1692		assert!(matches!(controller.nodes.get(&peer1), Some(Direction::Outbound)));
1693		assert!(matches!(controller.nodes.get(&peer2), Some(Direction::Inbound)));
1694		assert_eq!(controller.num_in, 1);
1695		assert_eq!(controller.num_out, 1);
1696
1697		controller.on_peer_dropped(peer1);
1698		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1699		assert_eq!(controller.nodes.len(), 1);
1700		assert!(!controller.nodes.contains_key(&peer1));
1701		assert_eq!(controller.num_in, 1);
1702		assert_eq!(controller.num_out, 0);
1703
1704		controller.on_peer_dropped(peer2);
1705		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1706		assert_eq!(controller.nodes.len(), 0);
1707		assert_eq!(controller.num_in, 0);
1708		assert_eq!(controller.num_out, 0);
1709	}
1710
1711	#[test]
1712	fn incoming_request_for_connected_reserved_node_switches_it_to_inbound() {
1713		let reserved1 = PeerId::random();
1714		let reserved2 = PeerId::random();
1715
1716		let config = ProtoSetConfig {
1717			in_peers: 10,
1718			out_peers: 10,
1719			reserved_nodes: [reserved1, reserved2].iter().cloned().collect(),
1720			reserved_only: false,
1721		};
1722		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1723
1724		let mut peer_store = MockPeerStoreHandle::new();
1725		peer_store.expect_register_protocol().once().return_const(());
1726		peer_store.expect_is_banned().times(2).return_const(false);
1727		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
1728
1729		let (_handle, mut controller) =
1730			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1731
1732		// Connect `reserved1` as inbound & `reserved2` as outbound.
1733		controller.on_incoming_connection(reserved1, IncomingIndex(1));
1734		controller.alloc_slots();
1735		let mut messages = Vec::new();
1736		while let Some(message) = rx.try_recv().ok() {
1737			messages.push(message);
1738		}
1739		assert_eq!(messages.len(), 2);
1740		assert!(messages.contains(&Message::Accept(IncomingIndex(1))));
1741		assert!(messages.contains(&Message::Connect { set_id: SetId::from(0), peer_id: reserved2 }));
1742		assert!(matches!(
1743			controller.reserved_nodes.get(&reserved1),
1744			Some(PeerState::Connected(Direction::Inbound))
1745		));
1746		assert!(matches!(
1747			controller.reserved_nodes.get(&reserved2),
1748			Some(PeerState::Connected(Direction::Outbound))
1749		));
1750
1751		// Incoming request for `reserved1`.
1752		controller.on_incoming_connection(reserved1, IncomingIndex(2));
1753		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1754		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1755		assert!(matches!(
1756			controller.reserved_nodes.get(&reserved1),
1757			Some(PeerState::Connected(Direction::Inbound))
1758		));
1759
1760		// Incoming request for `reserved2`.
1761		controller.on_incoming_connection(reserved2, IncomingIndex(3));
1762		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(3)));
1763		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1764		assert!(matches!(
1765			controller.reserved_nodes.get(&reserved2),
1766			Some(PeerState::Connected(Direction::Inbound))
1767		));
1768	}
1769
1770	#[test]
1771	fn incoming_request_for_connected_regular_node_switches_it_to_inbound() {
1772		let regular1 = PeerId::random();
1773		let regular2 = PeerId::random();
1774		let outgoing_candidates = vec![regular1.into()];
1775
1776		let config = ProtoSetConfig {
1777			in_peers: 10,
1778			out_peers: 10,
1779			reserved_nodes: HashSet::new(),
1780			reserved_only: false,
1781		};
1782		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1783
1784		let mut peer_store = MockPeerStoreHandle::new();
1785		peer_store.expect_register_protocol().once().return_const(());
1786		peer_store.expect_is_banned().times(3).return_const(false);
1787		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1788
1789		let (_handle, mut controller) =
1790			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1791		assert_eq!(controller.num_out, 0);
1792		assert_eq!(controller.num_in, 0);
1793
1794		// Connect `regular1` as outbound.
1795		controller.alloc_slots();
1796		assert_eq!(
1797			rx.try_recv().ok().unwrap(),
1798			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1799		);
1800		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1801		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1802
1803		// Connect `regular2` as inbound.
1804		controller.on_incoming_connection(regular2, IncomingIndex(0));
1805		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1806		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1807		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1808
1809		// Incoming request for `regular1`.
1810		controller.on_incoming_connection(regular1, IncomingIndex(1));
1811		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(1)));
1812		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1813		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Inbound,));
1814
1815		// Incoming request for `regular2`.
1816		controller.on_incoming_connection(regular2, IncomingIndex(2));
1817		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(2)));
1818		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1819		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1820	}
1821
1822	#[test]
1823	fn incoming_request_for_connected_node_is_rejected_if_its_banned() {
1824		let regular1 = PeerId::random();
1825		let regular2 = PeerId::random();
1826		let outgoing_candidates = vec![regular1.into()];
1827
1828		let config = ProtoSetConfig {
1829			in_peers: 10,
1830			out_peers: 10,
1831			reserved_nodes: HashSet::new(),
1832			reserved_only: false,
1833		};
1834		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1835
1836		let mut peer_store = MockPeerStoreHandle::new();
1837		peer_store.expect_register_protocol().once().return_const(());
1838		peer_store.expect_is_banned().once().return_const(false);
1839		peer_store.expect_is_banned().times(2).return_const(true);
1840		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1841
1842		let (_handle, mut controller) =
1843			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1844		assert_eq!(controller.num_out, 0);
1845		assert_eq!(controller.num_in, 0);
1846
1847		// Connect `regular1` as outbound.
1848		controller.alloc_slots();
1849		assert_eq!(
1850			rx.try_recv().ok().unwrap(),
1851			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1852		);
1853		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1854		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1855
1856		// Connect `regular2` as inbound.
1857		controller.on_incoming_connection(regular2, IncomingIndex(0));
1858		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1859		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1860		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1861
1862		// Incoming request for `regular1`.
1863		controller.on_incoming_connection(regular1, IncomingIndex(1));
1864		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1865		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1866		assert!(!controller.nodes.contains_key(&regular1));
1867
1868		// Incoming request for `regular2`.
1869		controller.on_incoming_connection(regular2, IncomingIndex(2));
1870		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1871		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1872		assert!(!controller.nodes.contains_key(&regular2));
1873	}
1874
1875	#[test]
1876	fn incoming_request_for_connected_node_is_rejected_if_no_slots_available() {
1877		let regular1 = PeerId::random();
1878		let regular2 = PeerId::random();
1879		let outgoing_candidates = vec![regular1.into()];
1880
1881		let config = ProtoSetConfig {
1882			in_peers: 1,
1883			out_peers: 1,
1884			reserved_nodes: HashSet::new(),
1885			reserved_only: false,
1886		};
1887		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1888
1889		let mut peer_store = MockPeerStoreHandle::new();
1890		peer_store.expect_register_protocol().once().return_const(());
1891		peer_store.expect_is_banned().once().return_const(false);
1892		peer_store.expect_outgoing_candidates().once().return_const(outgoing_candidates);
1893
1894		let (_handle, mut controller) =
1895			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1896		assert_eq!(controller.num_out, 0);
1897		assert_eq!(controller.num_in, 0);
1898
1899		// Connect `regular1` as outbound.
1900		controller.alloc_slots();
1901		assert_eq!(
1902			rx.try_recv().ok().unwrap(),
1903			Message::Connect { set_id: SetId::from(0), peer_id: regular1 }
1904		);
1905		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1906		assert!(matches!(controller.nodes.get(&regular1).unwrap(), Direction::Outbound,));
1907
1908		// Connect `regular2` as inbound.
1909		controller.on_incoming_connection(regular2, IncomingIndex(0));
1910		assert_eq!(rx.try_recv().ok().unwrap(), Message::Accept(IncomingIndex(0)));
1911		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1912		assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));
1913
1914		controller.max_in = 0;
1915
1916		// Incoming request for `regular1`.
1917		controller.on_incoming_connection(regular1, IncomingIndex(1));
1918		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(1)));
1919		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1920		assert!(!controller.nodes.contains_key(&regular1));
1921
1922		// Incoming request for `regular2`.
1923		controller.on_incoming_connection(regular2, IncomingIndex(2));
1924		assert_eq!(rx.try_recv().ok().unwrap(), Message::Reject(IncomingIndex(2)));
1925		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1926		assert!(!controller.nodes.contains_key(&regular2));
1927	}
1928
1929	#[test]
1930	fn incoming_peers_that_exceed_slots_are_rejected() {
1931		let peer1 = PeerId::random();
1932		let peer2 = PeerId::random();
1933
1934		let config = ProtoSetConfig {
1935			in_peers: 1,
1936			out_peers: 10,
1937			reserved_nodes: HashSet::new(),
1938			reserved_only: false,
1939		};
1940		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1941
1942		let mut peer_store = MockPeerStoreHandle::new();
1943		peer_store.expect_register_protocol().once().return_const(());
1944		peer_store.expect_is_banned().once().return_const(false);
1945
1946		let (_handle, mut controller) =
1947			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1948
1949		// Connect `peer1` as inbound.
1950		controller.on_incoming_connection(peer1, IncomingIndex(1));
1951		assert_eq!(rx.try_recv().unwrap(), Message::Accept(IncomingIndex(1)));
1952		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1953
1954		// Incoming requests for `peer2`.
1955		controller.on_incoming_connection(peer2, IncomingIndex(2));
1956		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(2)));
1957		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1958	}
1959
1960	#[test]
1961	fn banned_regular_incoming_node_is_rejected() {
1962		let peer1 = PeerId::random();
1963
1964		let config = ProtoSetConfig {
1965			in_peers: 10,
1966			out_peers: 10,
1967			reserved_nodes: HashSet::new(),
1968			reserved_only: false,
1969		};
1970		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1971
1972		let mut peer_store = MockPeerStoreHandle::new();
1973		peer_store.expect_register_protocol().once().return_const(());
1974		peer_store.expect_is_banned().once().return_const(true);
1975
1976		let (_handle, mut controller) =
1977			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
1978
1979		// Incoming request.
1980		controller.on_incoming_connection(peer1, IncomingIndex(1));
1981		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
1982		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
1983	}
1984
1985	#[test]
1986	fn banned_reserved_incoming_node_is_rejected() {
1987		let reserved1 = PeerId::random();
1988
1989		let config = ProtoSetConfig {
1990			in_peers: 10,
1991			out_peers: 10,
1992			reserved_nodes: std::iter::once(reserved1).collect(),
1993			reserved_only: false,
1994		};
1995		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
1996
1997		let mut peer_store = MockPeerStoreHandle::new();
1998		peer_store.expect_register_protocol().once().return_const(());
1999		peer_store.expect_is_banned().once().return_const(true);
2000
2001		let (_handle, mut controller) =
2002			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2003		assert!(controller.reserved_nodes.contains_key(&reserved1));
2004
2005		// Incoming request.
2006		controller.on_incoming_connection(reserved1, IncomingIndex(1));
2007		assert_eq!(rx.try_recv().unwrap(), Message::Reject(IncomingIndex(1)));
2008		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2009	}
2010
2011	#[test]
2012	fn we_dont_connect_to_banned_reserved_node() {
2013		let reserved1 = PeerId::random();
2014
2015		let config = ProtoSetConfig {
2016			in_peers: 10,
2017			out_peers: 10,
2018			reserved_nodes: std::iter::once(reserved1).collect(),
2019			reserved_only: false,
2020		};
2021		let (tx, mut rx) = tracing_unbounded("mpsc_test_to_notifications", 100);
2022
2023		let mut peer_store = MockPeerStoreHandle::new();
2024		peer_store.expect_register_protocol().once().return_const(());
2025		peer_store.expect_is_banned().once().return_const(true);
2026		peer_store.expect_outgoing_candidates().once().return_const(Vec::new());
2027
2028		let (_handle, mut controller) =
2029			ProtocolController::new(SetId::from(0), config, tx, Arc::new(peer_store));
2030		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2031
2032		// Initiate connections
2033		controller.alloc_slots();
2034		assert!(matches!(controller.reserved_nodes.get(&reserved1), Some(PeerState::NotConnected)));
2035		assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
2036	}
2037}