use crate::{
peer_store::{PeerStoreProvider, ProtocolHandle},
service::traits::{self, ValidationResult},
ProtocolName, ReputationChange as Reputation,
};
use futures::{channel::oneshot, future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use futures_timer::Delay;
use litep2p::protocol::notification::NotificationError;
use sc_network_types::PeerId;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{
collections::{HashMap, HashSet},
future::Future,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
const LOG_TARGET: &str = "sub-libp2p::peerset";
const DEFAULT_BACKOFF: Duration = Duration::from_secs(5);
const OPEN_FAILURE_BACKOFF: Duration = Duration::from_secs(5);
const SLOT_ALLOCATION_FREQUENCY: Duration = Duration::from_secs(1);
const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnected");
const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure");
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Reserved {
Yes,
No,
}
impl From<bool> for Reserved {
fn from(value: bool) -> Reserved {
match value {
true => Reserved::Yes,
false => Reserved::No,
}
}
}
impl From<Reserved> for bool {
fn from(value: Reserved) -> bool {
std::matches!(value, Reserved::Yes)
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Direction {
Inbound(Reserved),
Outbound(Reserved),
}
impl Direction {
fn set_reserved(&mut self, new_reserved: Reserved) {
match self {
Direction::Inbound(ref mut reserved) | Direction::Outbound(ref mut reserved) => {
*reserved = new_reserved
},
}
}
}
impl From<Direction> for traits::Direction {
fn from(direction: Direction) -> traits::Direction {
match direction {
Direction::Inbound(_) => traits::Direction::Inbound,
Direction::Outbound(_) => traits::Direction::Outbound,
}
}
}
#[derive(PartialEq, Eq, Debug)]
pub enum OpenResult {
Accept {
direction: traits::Direction,
},
Reject,
}
#[derive(Debug)]
pub enum PeersetCommand {
SetReservedPeers {
peers: HashSet<PeerId>,
},
AddReservedPeers {
peers: HashSet<PeerId>,
},
RemoveReservedPeers {
peers: HashSet<PeerId>,
},
SetReservedOnly {
reserved_only: bool,
},
DisconnectPeer {
peer: PeerId,
},
GetReservedPeers {
tx: oneshot::Sender<Vec<PeerId>>,
},
}
#[derive(Debug)]
pub struct PeersetNotificationCommand {
pub open_peers: Vec<PeerId>,
pub close_peers: Vec<PeerId>,
}
impl PeersetNotificationCommand {
pub fn open_substream(peers: Vec<PeerId>) -> Self {
Self { open_peers: peers, close_peers: vec![] }
}
pub fn close_substream(peers: Vec<PeerId>) -> Self {
Self { open_peers: vec![], close_peers: peers }
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum PeerState {
Disconnected,
Backoff,
Opening {
direction: Direction,
},
Connected {
direction: Direction,
},
Canceled {
direction: Direction,
},
Closing {
direction: Direction,
},
}
#[derive(Debug)]
pub struct Peerset {
protocol: ProtocolName,
cmd_rx: TracingUnboundedReceiver<PeersetCommand>,
max_out: usize,
num_out: usize,
max_in: usize,
num_in: usize,
reserved_only: bool,
reserved_peers: HashSet<PeerId>,
peerstore_handle: Arc<dyn PeerStoreProvider>,
peers: HashMap<PeerId, PeerState>,
connected_peers: Arc<AtomicUsize>,
pending_backoffs: FuturesUnordered<BoxFuture<'static, (PeerId, Reputation)>>,
next_slot_allocation: Delay,
}
macro_rules! decrement_or_warn {
($slot:expr, $protocol:expr, $peer:expr, $direction:expr) => {{
match $slot.checked_sub(1) {
Some(value) => {
$slot = value;
}
None => {
log::warn!(
target: LOG_TARGET,
"{}: state mismatch, {:?} is not counted as part of {:?} slots",
$protocol, $peer, $direction
);
debug_assert!(false);
}
}
}};
}
#[derive(Debug)]
struct PeersetHandle {
tx: TracingUnboundedSender<PeersetCommand>,
}
impl ProtocolHandle for PeersetHandle {
fn disconnect_peer(&self, peer: PeerId) {
let _ = self.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
}
}
impl Peerset {
pub fn new(
protocol: ProtocolName,
max_out: usize,
max_in: usize,
reserved_only: bool,
reserved_peers: HashSet<PeerId>,
connected_peers: Arc<AtomicUsize>,
peerstore_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, TracingUnboundedSender<PeersetCommand>) {
let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc-peerset-protocol", 100_000);
let peers = reserved_peers
.iter()
.map(|peer| (*peer, PeerState::Disconnected))
.collect::<HashMap<_, _>>();
peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() }));
log::debug!(
target: LOG_TARGET,
"{}: creating new peerset with max_outbound {} and max_inbound {} and reserved_only {}",
protocol,
max_out,
max_in,
reserved_only,
);
(
Self {
protocol,
max_out,
num_out: 0usize,
max_in,
num_in: 0usize,
reserved_peers,
cmd_rx,
peerstore_handle,
reserved_only,
peers,
connected_peers,
pending_backoffs: FuturesUnordered::new(),
next_slot_allocation: Delay::new(SLOT_ALLOCATION_FREQUENCY),
},
cmd_tx,
)
}
pub fn report_substream_opened(
&mut self,
peer: PeerId,
direction: traits::Direction,
) -> OpenResult {
log::trace!(
target: LOG_TARGET,
"{}: substream opened to {peer:?}, direction {direction:?}, reserved peer {}",
self.protocol,
self.reserved_peers.contains(&peer),
);
let Some(state) = self.peers.get_mut(&peer) else {
log::warn!(target: LOG_TARGET, "{}: substream opened for unknown peer {peer:?}", self.protocol);
debug_assert!(false);
return OpenResult::Reject;
};
match state {
PeerState::Opening { direction: substream_direction } => {
let real_direction: traits::Direction = (*substream_direction).into();
*state = PeerState::Connected { direction: *substream_direction };
self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
return OpenResult::Accept { direction: real_direction };
},
PeerState::Canceled { direction: substream_direction } => {
log::trace!(
target: LOG_TARGET,
"{}: substream to {peer:?} is canceled, issue disconnection request",
self.protocol,
);
self.connected_peers.fetch_add(1usize, Ordering::Relaxed);
*state = PeerState::Closing { direction: *substream_direction };
return OpenResult::Reject;
},
PeerState::Disconnected => {
log::debug!(
target: LOG_TARGET,
"{}: substream opened for a peer that was previously rejected {peer:?}",
self.protocol,
);
return OpenResult::Reject;
},
state => {
log::error!(
target: LOG_TARGET,
"{}: substream opened for a peer in invalid state {peer:?}: {state:?}",
self.protocol,
);
debug_assert!(false);
return OpenResult::Reject;
},
}
}
pub fn report_substream_closed(&mut self, peer: PeerId) {
log::trace!(target: LOG_TARGET, "{}: substream closed to {peer:?}", self.protocol);
let Some(state) = self.peers.get_mut(&peer) else {
log::warn!(target: LOG_TARGET, "{}: substream closed for unknown peer {peer:?}", self.protocol);
debug_assert!(false);
return;
};
match &state {
PeerState::Connected { direction: Direction::Inbound(Reserved::No) } |
PeerState::Closing { direction: Direction::Inbound(Reserved::No) } => {
log::trace!(
target: LOG_TARGET,
"{}: inbound substream closed to non-reserved peer {peer:?}: {state:?}",
self.protocol,
);
decrement_or_warn!(
self.num_in,
peer,
self.protocol,
Direction::Inbound(Reserved::No)
);
},
PeerState::Connected { direction: Direction::Outbound(Reserved::No) } |
PeerState::Closing { direction: Direction::Outbound(Reserved::No) } => {
log::trace!(
target: LOG_TARGET,
"{}: outbound substream closed to non-reserved peer {peer:?} {state:?}",
self.protocol,
);
decrement_or_warn!(
self.num_out,
peer,
self.protocol,
Direction::Outbound(Reserved::No)
);
},
PeerState::Closing { .. } | PeerState::Connected { .. } => {
log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol);
},
PeerState::Disconnected => {
log::debug!(
target: LOG_TARGET,
"{}: substream closed for a peer that was previously rejected {peer:?}",
self.protocol,
);
},
state => {
log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol);
debug_assert!(false);
},
}
if !matches!(state, PeerState::Disconnected) {
self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
}
*state = PeerState::Backoff;
self.pending_backoffs.push(Box::pin(async move {
Delay::new(DEFAULT_BACKOFF).await;
(peer, DISCONNECT_ADJUSTMENT)
}));
}
pub fn report_inbound_substream(&mut self, peer: PeerId) -> ValidationResult {
log::trace!(target: LOG_TARGET, "{}: inbound substream from {peer:?}", self.protocol);
if self.peerstore_handle.is_banned(&peer) {
log::debug!(
target: LOG_TARGET,
"{}: rejecting banned peer {peer:?}",
self.protocol,
);
return ValidationResult::Reject;
}
let state = self.peers.entry(peer).or_insert(PeerState::Disconnected);
let is_reserved_peer = self.reserved_peers.contains(&peer);
let should_reject = self.reserved_only && !is_reserved_peer;
match state {
PeerState::Disconnected if should_reject => {
log::trace!(
target: LOG_TARGET,
"{}: rejecting non-reserved peer {peer:?} in reserved-only mode (prev state: {state:?})",
self.protocol,
);
return ValidationResult::Reject;
},
PeerState::Disconnected => {},
PeerState::Backoff => {
if !is_reserved_peer && self.num_in == self.max_in {
log::trace!(
target: LOG_TARGET,
"{}: ({peer:?}) is backed-off and cannot accept, reject inbound substream",
self.protocol,
);
return ValidationResult::Reject;
}
if should_reject {
return ValidationResult::Reject;
}
},
PeerState::Opening { direction: Direction::Outbound(reserved) } => {
if should_reject {
log::trace!(
target: LOG_TARGET,
"{}: rejecting inbound substream from {peer:?} ({reserved:?}) in reserved-only mode that was marked outbound",
self.protocol,
);
*state = PeerState::Canceled { direction: Direction::Outbound(*reserved) };
return ValidationResult::Reject;
}
log::trace!(
target: LOG_TARGET,
"{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound",
self.protocol,
);
return ValidationResult::Accept;
},
PeerState::Canceled { direction } => {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} is canceled, rejecting substream should_reject={should_reject}",
self.protocol,
);
*state = PeerState::Canceled { direction: *direction };
return ValidationResult::Reject;
},
state => {
log::warn!(
target: LOG_TARGET,
"{}: invalid state ({state:?}) for inbound substream, peer {peer:?}",
self.protocol
);
debug_assert!(false);
return ValidationResult::Reject;
},
}
if is_reserved_peer {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} accepting peer as reserved peer",
self.protocol,
);
*state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
return ValidationResult::Accept;
}
if self.num_in < self.max_in {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} accepting peer as regular peer",
self.protocol,
);
self.num_in += 1;
*state = PeerState::Opening { direction: Direction::Inbound(is_reserved_peer.into()) };
return ValidationResult::Accept;
}
log::trace!(
target: LOG_TARGET,
"{}: reject {peer:?}, not a reserved peer and no free inbound slots",
self.protocol,
);
*state = PeerState::Disconnected;
return ValidationResult::Reject;
}
pub fn report_substream_open_failure(&mut self, peer: PeerId, error: NotificationError) {
log::trace!(
target: LOG_TARGET,
"{}: failed to open substream to {peer:?}: {error:?}",
self.protocol,
);
match self.peers.get(&peer) {
Some(PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) => {
decrement_or_warn!(
self.num_out,
self.protocol,
peer,
Direction::Outbound(Reserved::No)
);
},
Some(PeerState::Opening { direction: Direction::Inbound(Reserved::No) }) => {
decrement_or_warn!(
self.num_in,
self.protocol,
peer,
Direction::Inbound(Reserved::No)
);
},
Some(PeerState::Canceled { direction }) => match direction {
Direction::Inbound(Reserved::No) => {
decrement_or_warn!(
self.num_in,
self.protocol,
peer,
Direction::Inbound(Reserved::No)
);
},
Direction::Outbound(Reserved::No) => {
decrement_or_warn!(
self.num_out,
self.protocol,
peer,
Direction::Outbound(Reserved::No)
);
},
_ => {},
},
Some(PeerState::Opening { direction: Direction::Inbound(Reserved::Yes) }) |
Some(PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) => {
log::debug!(
target: LOG_TARGET,
"{}: substream open failure for reserved peer {peer:?}",
self.protocol,
);
},
state => {
log::debug!(
target: LOG_TARGET,
"{}: substream open failure for a unknown state: {state:?}",
self.protocol,
);
return;
},
}
self.peers.insert(peer, PeerState::Backoff);
self.pending_backoffs.push(Box::pin(async move {
Delay::new(OPEN_FAILURE_BACKOFF).await;
(peer, OPEN_FAILURE_ADJUSTMENT)
}));
}
pub fn report_substream_rejected(&mut self, peer: PeerId) {
log::trace!(target: LOG_TARGET, "{}: {peer:?} rejected by the protocol", self.protocol);
match self.peers.remove(&peer) {
Some(PeerState::Opening { direction }) => match direction {
Direction::Inbound(Reserved::Yes) | Direction::Outbound(Reserved::Yes) => {
log::warn!(
target: LOG_TARGET,
"{}: reserved peer {peer:?} rejected by the protocol",
self.protocol,
);
self.peers.insert(peer, PeerState::Disconnected);
},
Direction::Inbound(Reserved::No) => {
decrement_or_warn!(
self.num_in,
peer,
self.protocol,
Direction::Inbound(Reserved::No)
);
self.peers.insert(peer, PeerState::Disconnected);
},
Direction::Outbound(Reserved::No) => {
decrement_or_warn!(
self.num_out,
peer,
self.protocol,
Direction::Outbound(Reserved::No)
);
self.peers.insert(peer, PeerState::Disconnected);
},
},
Some(state @ PeerState::Canceled { .. }) => {
log::debug!(
target: LOG_TARGET,
"{}: substream to {peer:?} rejected by protocol but already canceled",
self.protocol,
);
self.peers.insert(peer, state);
},
Some(state) => {
log::debug!(
target: LOG_TARGET,
"{}: {peer:?} rejected by the protocol but not opening anymore: {state:?}",
self.protocol,
);
self.peers.insert(peer, state);
},
None => {},
}
}
fn calculate_slot_adjustment<'a>(
&'a mut self,
peers: impl Iterator<Item = &'a PeerId>,
) -> (usize, usize) {
peers.fold((0, 0), |(mut inbound, mut outbound), peer| {
match self.peers.get_mut(peer) {
Some(PeerState::Disconnected | PeerState::Backoff) => {},
Some(
PeerState::Opening { ref mut direction } |
PeerState::Connected { ref mut direction } |
PeerState::Canceled { ref mut direction } |
PeerState::Closing { ref mut direction },
) => {
*direction = match direction {
Direction::Inbound(Reserved::No) => {
inbound += 1;
Direction::Inbound(Reserved::Yes)
},
Direction::Outbound(Reserved::No) => {
outbound += 1;
Direction::Outbound(Reserved::Yes)
},
ref direction => **direction,
};
},
None => {
self.peers.insert(*peer, PeerState::Disconnected);
},
}
(inbound, outbound)
})
}
fn should_disconnect(&self, direction: Direction) -> bool {
match direction {
Direction::Inbound(_) => self.num_in >= self.max_in,
Direction::Outbound(_) => self.num_out >= self.max_out,
}
}
fn increment_slot(&mut self, direction: Direction) {
match direction {
Direction::Inbound(Reserved::No) => self.num_in += 1,
Direction::Outbound(Reserved::No) => self.num_out += 1,
_ => {},
}
}
fn connect_reserved_peers(&mut self) -> Vec<PeerId> {
self.reserved_peers
.iter()
.filter_map(|peer| {
let peer_state = self.peers.get(peer);
if peer_state != Some(&PeerState::Disconnected) {
return None;
}
if self.peerstore_handle.is_banned(peer) {
return None;
}
self.peers.insert(
*peer,
PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) },
);
Some(*peer)
})
.collect::<Vec<_>>()
}
#[cfg(test)]
pub fn num_in(&self) -> usize {
self.num_in
}
#[cfg(test)]
pub fn num_out(&self) -> usize {
self.num_out
}
#[cfg(test)]
pub fn peers(&self) -> &HashMap<PeerId, PeerState> {
&self.peers
}
#[cfg(test)]
pub fn peers_mut(&mut self) -> &mut HashMap<PeerId, PeerState> {
&mut self.peers
}
#[cfg(test)]
pub fn reserved_peers(&self) -> &HashSet<PeerId> {
&self.reserved_peers
}
}
impl Stream for Peerset {
type Item = PeersetNotificationCommand;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while let Poll::Ready(Some((peer, reputation))) = self.pending_backoffs.poll_next_unpin(cx)
{
log::trace!(target: LOG_TARGET, "{}: backoff expired for {peer:?}", self.protocol);
if std::matches!(self.peers.get(&peer), None | Some(PeerState::Backoff)) {
self.peers.insert(peer, PeerState::Disconnected);
}
self.peerstore_handle.report_peer(peer, reputation);
}
if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) {
log::trace!(target: LOG_TARGET, "{}: received command {action:?}", self.protocol);
match action {
PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) => {
match self.peers.remove(&peer) {
Some(PeerState::Connected { direction }) => {
log::trace!(
target: LOG_TARGET,
"{}: close connection to {peer:?}, direction {direction:?}",
self.protocol,
);
self.peers.insert(peer, PeerState::Closing { direction });
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
vec![peer],
)));
},
Some(PeerState::Backoff) => {
log::trace!(
target: LOG_TARGET,
"{}: cannot disconnect {peer:?}, already backed-off",
self.protocol,
);
self.peers.insert(peer, PeerState::Backoff);
},
Some(PeerState::Opening { direction }) => {
log::trace!(
target: LOG_TARGET,
"{}: canceling substream to disconnect peer {peer:?}",
self.protocol,
);
self.peers.insert(peer, PeerState::Canceled { direction });
},
Some(state @ PeerState::Closing { .. }) => {
log::trace!(
target: LOG_TARGET,
"{}: cannot disconnect {peer:?}, already closing ({state:?})",
self.protocol,
);
self.peers.insert(peer, state);
},
Some(state @ PeerState::Disconnected) => {
self.peers.insert(peer, state);
},
Some(state @ PeerState::Canceled { .. }) => {
log::debug!(
target: LOG_TARGET,
"{}: cannot disconnect {peer:?}, already canceled ({state:?})",
self.protocol,
);
self.peers.insert(peer, state);
},
None => {
log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
},
}
},
PeersetCommand::DisconnectPeer { peer } => {
log::debug!(
target: LOG_TARGET,
"{}: ignoring disconnection request for reserved peer {peer}",
self.protocol,
);
},
PeersetCommand::SetReservedPeers { peers } => {
log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol);
let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
self.num_out -= out_peers;
self.num_in -= in_peers;
let reserved_peers_maybe_remove =
self.reserved_peers.difference(&peers).cloned().collect::<Vec<_>>();
self.reserved_peers = peers;
let peers_to_remove = reserved_peers_maybe_remove
.into_iter()
.filter(|peer| {
match self.peers.remove(&peer) {
Some(PeerState::Connected { mut direction }) => {
let disconnect =
self.reserved_only || self.should_disconnect(direction);
if disconnect {
log::trace!(
target: LOG_TARGET,
"{}: close connection to previously reserved {peer:?}, direction {direction:?}",
self.protocol,
);
self.peers.insert(*peer, PeerState::Closing { direction });
true
} else {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} is no longer reserved, move to regular peers, direction {direction:?}",
self.protocol,
);
direction.set_reserved(Reserved::No);
self.increment_slot(direction);
self.peers
.insert(*peer, PeerState::Connected { direction });
false
}
},
Some(PeerState::Opening { direction }) => {
log::trace!(
target: LOG_TARGET,
"{}: cancel substream to {peer:?}, direction {direction:?}",
self.protocol,
);
self.peers.insert(*peer, PeerState::Canceled { direction });
false
},
Some(state) => {
self.peers.insert(*peer, state);
false
},
None => {
log::debug!(target: LOG_TARGET, "{}: {peer:?} doesn't exist", self.protocol);
debug_assert!(false);
false
},
}
})
.collect();
let connect_to = self.connect_reserved_peers();
let command = PeersetNotificationCommand {
open_peers: connect_to,
close_peers: peers_to_remove,
};
log::trace!(
target: LOG_TARGET,
"{}: SetReservedPeers result {command:?}",
self.protocol,
);
return Poll::Ready(Some(command));
},
PeersetCommand::AddReservedPeers { peers } => {
log::debug!(target: LOG_TARGET, "{}: add reserved peers {peers:?}", self.protocol);
let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
self.num_out -= out_peers;
self.num_in -= in_peers;
let peers = peers
.iter()
.filter_map(|peer| {
if !self.reserved_peers.insert(*peer) {
log::warn!(
target: LOG_TARGET,
"{}: {peer:?} is already a reserved peer",
self.protocol,
);
return None;
}
std::matches!(
self.peers.get_mut(peer),
None | Some(PeerState::Disconnected)
)
.then(|| {
self.peers.insert(
*peer,
PeerState::Opening {
direction: Direction::Outbound(Reserved::Yes),
},
);
*peer
})
})
.collect();
log::debug!(target: LOG_TARGET, "{}: start connecting to {peers:?}", self.protocol);
return Poll::Ready(Some(PeersetNotificationCommand::open_substream(peers)));
},
PeersetCommand::RemoveReservedPeers { peers } => {
log::debug!(target: LOG_TARGET, "{}: remove reserved peers {peers:?}", self.protocol);
let peers_to_remove = peers
.iter()
.filter_map(|peer| {
if !self.reserved_peers.remove(peer) {
log::debug!(
target: LOG_TARGET,
"{}: {peer} is not a reserved peer",
self.protocol,
);
return None
}
match self.peers.remove(peer)? {
PeerState::Backoff => {
log::trace!(
target: LOG_TARGET,
"{}: cannot disconnect removed reserved peer {peer:?}, already backed-off",
self.protocol,
);
self.peers.insert(*peer, PeerState::Backoff);
None
},
PeerState::Canceled { direction } => {
log::trace!(
target: LOG_TARGET,
"{}: cannot disconnect removed reserved peer {peer:?}, already canceled",
self.protocol,
);
self.peers.insert(*peer, PeerState::Canceled { direction });
None
},
PeerState::Disconnected => {
log::trace!(
target: LOG_TARGET,
"{}: cannot disconnect removed reserved peer {peer:?}, already disconnected",
self.protocol,
);
self.peers.insert(*peer, PeerState::Disconnected);
None
},
PeerState::Closing { direction } => {
log::trace!(
target: LOG_TARGET,
"{}: cannot disconnect removed reserved peer {peer:?}, already closing",
self.protocol,
);
self.peers.insert(*peer, PeerState::Closing { direction });
None
},
PeerState::Connected { mut direction } => {
let disconnect = self.should_disconnect(direction);
if disconnect {
log::trace!(
target: LOG_TARGET,
"{}: close connection to removed reserved {peer:?}, direction {direction:?}",
self.protocol,
);
self.peers.insert(*peer, PeerState::Closing { direction });
Some(*peer)
} else {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
self.protocol,
);
direction.set_reserved(Reserved::No);
self.increment_slot(direction);
self.peers
.insert(*peer, PeerState::Connected { direction });
None
}
},
PeerState::Opening { mut direction } => {
let disconnect = self.should_disconnect(direction);
if disconnect {
log::trace!(
target: LOG_TARGET,
"{}: cancel substream to disconnect removed reserved peer {peer:?}, direction {direction:?}",
self.protocol,
);
self.peers.insert(
*peer,
PeerState::Canceled {
direction
},
);
} else {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
self.protocol,
);
direction.set_reserved(Reserved::No);
self.increment_slot(direction);
self.peers
.insert(*peer, PeerState::Opening { direction });
}
None
},
}
})
.collect();
log::debug!(
target: LOG_TARGET,
"{}: close substreams to {peers_to_remove:?}",
self.protocol,
);
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
peers_to_remove,
)));
},
PeersetCommand::SetReservedOnly { reserved_only } => {
log::debug!(target: LOG_TARGET, "{}: set reserved only mode to {reserved_only}", self.protocol);
self.reserved_only = reserved_only;
if reserved_only {
let peers_to_remove = self
.peers
.iter()
.filter_map(|(peer, state)| {
(!self.reserved_peers.contains(peer) &&
std::matches!(state, PeerState::Connected { .. }))
.then_some(*peer)
})
.collect::<Vec<_>>();
self.peers.iter_mut().for_each(|(_, state)| match state {
PeerState::Connected { direction } => {
*state = PeerState::Closing { direction: *direction };
},
PeerState::Opening { direction } => {
*state = PeerState::Canceled { direction: *direction };
},
_ => {},
});
return Poll::Ready(Some(PeersetNotificationCommand::close_substream(
peers_to_remove,
)));
}
},
PeersetCommand::GetReservedPeers { tx } => {
let _ = tx.send(self.reserved_peers.iter().cloned().collect());
},
}
}
if let Poll::Ready(()) = Pin::new(&mut self.next_slot_allocation).poll(cx) {
let mut connect_to = self.connect_reserved_peers();
if self.num_out < self.max_out && !self.reserved_only {
let ignore: HashSet<PeerId> = self
.peers
.iter()
.filter_map(|(peer, state)| {
(!std::matches!(state, PeerState::Disconnected)).then_some(*peer)
})
.chain(self.reserved_peers.iter().cloned())
.collect();
let peers: Vec<_> =
self.peerstore_handle.outgoing_candidates(self.max_out - self.num_out, ignore);
if peers.len() > 0 {
peers.iter().for_each(|peer| {
self.peers.insert(
*peer,
PeerState::Opening { direction: Direction::Outbound(Reserved::No) },
);
});
self.num_out += peers.len();
connect_to.extend(peers);
}
}
self.next_slot_allocation = Delay::new(SLOT_ALLOCATION_FREQUENCY);
if !connect_to.is_empty() {
log::trace!(
target: LOG_TARGET,
"{}: start connecting to peers {connect_to:?}",
self.protocol,
);
return Poll::Ready(Some(PeersetNotificationCommand::open_substream(connect_to)));
}
}
Poll::Pending
}
}