use crate::{
litep2p::{
peerstore::peerstore_handle_test,
shim::notification::peerset::{
Direction, OpenResult, PeerState, Peerset, PeersetCommand, Reserved,
},
},
service::traits::{self, ValidationResult},
ProtocolName,
};
use futures::prelude::*;
use litep2p::protocol::notification::NotificationError;
use sc_network_types::PeerId;
use std::{
collections::HashSet,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Poll,
};
#[tokio::test]
async fn inbound_substream_for_outbound_peer() {
let peerstore_handle = Arc::new(peerstore_handle_test());
let peers = (0..3)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let inbound_peer = *peers.iter().next().unwrap();
let (mut peerset, _to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
false,
Default::default(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(out_peers.len(), 3usize);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
assert_eq!(
peerset.peers().get(&inbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(peerset.report_inbound_substream(inbound_peer), ValidationResult::Accept);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
assert_eq!(
peerset.peers().get(&inbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
}
#[tokio::test]
async fn canceled_peer_gets_banned() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let peers = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
0,
0,
true,
peers.clone(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for outbound_peer in &out_peers {
assert!(peers.contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
to_peerset
.unbounded_send(PeersetCommand::RemoveReservedPeers { peers: peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.open_peers.is_empty());
assert!(command.close_peers.is_empty());
},
event => panic!("invalid event: {event:?}"),
}
for (_, state) in peerset.peers() {
assert_eq!(state, &PeerState::Canceled { direction: Direction::Outbound(Reserved::Yes) });
}
}
#[tokio::test]
async fn peer_added_and_removed_from_peerset() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
0,
0,
true,
Default::default(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
let peers = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
to_peerset
.unbounded_send(PeersetCommand::AddReservedPeers { peers: peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for outbound_peer in &out_peers {
assert!(peers.contains(outbound_peer));
assert!(peerset.reserved_peers().contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
for peer in &peers {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
to_peerset
.unbounded_send(PeersetCommand::RemoveReservedPeers { peers: peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.open_peers.is_empty());
let out_peers = command.close_peers;
assert!(!out_peers.is_empty());
for peer in &out_peers {
assert!(peers.contains(peer));
assert!(!peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) }),
);
}
},
event => panic!("invalid event: {event:?}"),
}
to_peerset
.unbounded_send(PeersetCommand::AddReservedPeers { peers: peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert!(out_peers.is_empty());
for peer in &peers {
assert!(peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) }),
);
}
},
event => panic!("invalid event: {event:?}"),
}
to_peerset
.unbounded_send(PeersetCommand::RemoveReservedPeers { peers: peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.open_peers.is_empty());
assert!(command.close_peers.is_empty());
for peer in &peers {
assert!(!peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) }),
);
}
},
event => panic!("invalid event: {event:?}"),
}
}
#[tokio::test]
async fn set_reserved_peers() {
sp_tracing::try_init_simple();
let reserved = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
true,
reserved.clone(),
Default::default(),
Arc::new(peerstore_handle_test()),
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for outbound_peer in &out_peers {
assert!(reserved.contains(outbound_peer));
assert!(peerset.reserved_peers().contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
for peer in &reserved {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
let new_reserved_peers =
HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
to_peerset
.unbounded_send(PeersetCommand::SetReservedPeers { peers: new_reserved_peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert_eq!(command.open_peers.len(), 3);
for peer in command.open_peers {
assert!(new_reserved_peers.contains(&peer));
assert!(peerset.reserved_peers().contains(&peer));
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
}
let out_peers = command.close_peers;
assert_eq!(out_peers.len(), 3);
for peer in &out_peers {
assert!(reserved.contains(peer));
assert!(!peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) }),
);
}
},
event => panic!("invalid event: {event:?}"),
}
}
#[tokio::test]
async fn set_reserved_peers_one_peer_already_in_the_set() {
sp_tracing::try_init_simple();
let reserved = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
let common_peer = *reserved.iter().next().unwrap();
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
true,
reserved.clone(),
Default::default(),
Arc::new(peerstore_handle_test()),
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for outbound_peer in &out_peers {
assert!(reserved.contains(outbound_peer));
assert!(peerset.reserved_peers().contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
for peer in &reserved {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
let new_reserved_peers = HashSet::from_iter([PeerId::random(), PeerId::random(), common_peer]);
to_peerset
.unbounded_send(PeersetCommand::SetReservedPeers { peers: new_reserved_peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert_eq!(command.open_peers.len(), 2);
for peer in command.open_peers {
assert!(new_reserved_peers.contains(&peer));
assert!(peerset.reserved_peers().contains(&peer));
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
}
let out_peers = command.close_peers;
assert_eq!(out_peers.len(), 2);
for peer in &out_peers {
assert!(reserved.contains(peer));
if peer != &common_peer {
assert!(!peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) }),
);
} else {
panic!("common peer disconnected");
}
}
for peer in &new_reserved_peers {
assert!(peerset.reserved_peers().contains(peer));
}
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(
peerset.peers().get(&common_peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
#[tokio::test]
async fn add_reserved_peers_one_peer_already_in_the_set() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let reserved = (0..3)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let common_peer = *reserved.iter().next().unwrap();
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
true,
reserved.iter().cloned().collect(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
assert_eq!(out_peers.len(), 3);
for outbound_peer in &out_peers {
assert!(reserved.contains(outbound_peer));
assert!(peerset.reserved_peers().contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
for peer in &reserved {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
let new_reserved_peers = HashSet::from_iter([PeerId::random(), PeerId::random(), common_peer]);
to_peerset
.unbounded_send(PeersetCommand::AddReservedPeers { peers: new_reserved_peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(out_peers.len(), 2);
assert!(!out_peers.iter().any(|peer| peer == &common_peer));
for peer in &out_peers {
assert!(!reserved.contains(peer));
if peer != &common_peer {
assert!(peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
}
}
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(
peerset.peers().get(&common_peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
#[tokio::test]
async fn opening_peer_gets_canceled_and_disconnected() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let _known_peers = (0..1)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let num_connected = Arc::new(Default::default());
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
false,
Default::default(),
Arc::clone(&num_connected),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0);
assert_eq!(peerset.num_out(), 0);
let peer = match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0);
assert_eq!(peerset.num_out(), 1);
assert_eq!(out_peers.len(), 1);
for peer in &out_peers {
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
}
out_peers[0]
},
event => panic!("invalid event: {event:?}"),
};
to_peerset.unbounded_send(PeersetCommand::DisconnectPeer { peer }).unwrap();
futures::future::poll_fn(|cx| match peerset.poll_next_unpin(cx) {
Poll::Pending => Poll::Ready(()),
_ => panic!("unexpected event"),
})
.await;
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Canceled { direction: Direction::Outbound(Reserved::No) })
);
assert_eq!(peerset.num_out(), 1);
assert!(std::matches!(
peerset.report_substream_opened(peer, traits::Direction::Outbound),
OpenResult::Reject { .. }
));
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::No) })
);
assert_eq!(num_connected.load(Ordering::SeqCst), 1);
assert_eq!(peerset.num_out(), 1);
peerset.report_substream_closed(peer);
assert_eq!(peerset.num_out(), 0);
assert_eq!(num_connected.load(Ordering::SeqCst), 0);
assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Backoff));
}
#[tokio::test]
async fn open_failure_for_canceled_peer() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let _known_peers = (0..1)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
false,
Default::default(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
let peer = match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 1usize);
assert_eq!(out_peers.len(), 1);
for peer in &out_peers {
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
}
out_peers[0]
},
event => panic!("invalid event: {event:?}"),
};
to_peerset.unbounded_send(PeersetCommand::DisconnectPeer { peer }).unwrap();
futures::future::poll_fn(|cx| match peerset.poll_next_unpin(cx) {
Poll::Pending => Poll::Ready(()),
_ => panic!("unexpected event"),
})
.await;
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Canceled { direction: Direction::Outbound(Reserved::No) })
);
peerset.report_substream_open_failure(peer, NotificationError::NoConnection);
assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Backoff));
futures::future::poll_fn(|cx| match peerset.poll_next_unpin(cx) {
Poll::Pending => Poll::Ready(()),
_ => panic!("unexpected event"),
})
.await;
}
#[tokio::test]
async fn peer_disconnected_when_being_validated_then_rejected() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let (mut peerset, _to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
false,
Default::default(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
let peer = PeerId::random();
assert_eq!(peerset.report_inbound_substream(peer), ValidationResult::Accept);
peerset.report_substream_open_failure(peer, NotificationError::NoConnection);
assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Backoff));
peerset.report_substream_rejected(peer);
assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Backoff));
}
#[tokio::test]
async fn removed_reserved_peer_kept_due_to_free_slots() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let peers = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
true,
peers.clone(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for outbound_peer in &out_peers {
assert!(peers.contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
to_peerset
.unbounded_send(PeersetCommand::RemoveReservedPeers { peers: peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.open_peers.is_empty());
assert!(command.close_peers.is_empty());
},
event => panic!("invalid event: {event:?}"),
}
for (_, state) in peerset.peers() {
assert_eq!(state, &PeerState::Opening { direction: Direction::Outbound(Reserved::No) });
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
}
#[tokio::test]
async fn set_reserved_peers_but_available_slots() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let known_peers = (0..3)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let common_peer = *known_peers.iter().next().unwrap();
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
false,
Default::default(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(out_peers.len(), 3);
for peer in &out_peers {
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
for peer in &known_peers {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::No) })
);
}
let reserved_peers = HashSet::from_iter([common_peer, PeerId::random(), PeerId::random()]);
to_peerset
.unbounded_send(PeersetCommand::SetReservedPeers { peers: reserved_peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
assert_eq!(peerset.peers().len(), 5);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 2usize);
assert_eq!(peerset.reserved_peers().len(), 3usize);
let peers = command.open_peers;
assert_eq!(peers.len(), 2);
assert_eq!(peers.len(), 2);
assert!(!peers.contains(&common_peer));
for peer in &peers {
assert!(reserved_peers.contains(peer));
assert!(peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
}
},
event => panic!("invalid event: {event:?}"),
}
}
#[tokio::test]
async fn set_reserved_peers_move_previously_reserved() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let known_peers = (0..3)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let common_peer = *known_peers.iter().next().unwrap();
let moved_peers = known_peers.iter().skip(1).copied().collect::<HashSet<_>>();
let known_peers = known_peers.into_iter().collect::<HashSet<_>>();
assert_eq!(moved_peers.len(), 2);
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
25,
25,
false,
known_peers.clone(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(out_peers.len(), 3);
for peer in &out_peers {
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
assert_eq!(peerset.reserved_peers().len(), 3usize);
for peer in &known_peers {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
let reserved_peers = HashSet::from_iter([common_peer, PeerId::random(), PeerId::random()]);
to_peerset
.unbounded_send(PeersetCommand::SetReservedPeers { peers: reserved_peers.clone() })
.unwrap();
let Some(command) = peerset.next().await else {
panic!("expected command");
};
assert!(command.close_peers.is_empty());
assert_eq!(peerset.peers().len(), 5);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 2usize);
assert_eq!(peerset.reserved_peers().len(), 3usize);
for (peer, state) in peerset.peers() {
if peer == &common_peer {
assert_eq!(
state,
&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) }
);
continue;
}
if reserved_peers.contains(peer) {
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
continue;
}
if moved_peers.contains(peer) {
assert_eq!(
state,
&PeerState::Connected { direction: Direction::Outbound(Reserved::No) }
);
continue;
}
panic!("Invalid state peer={peer:?} state={state:?}");
}
let peers = command.open_peers;
assert_eq!(peers.len(), 2);
assert!(!peers.contains(&common_peer));
for peer in &peers {
assert!(reserved_peers.contains(peer));
assert!(peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
}
}
#[tokio::test]
async fn set_reserved_peers_cannot_move_previously_reserved() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let known_peers = (0..3)
.map(|_| {
let peer = PeerId::random();
peerstore_handle.add_known_peer(peer);
peer
})
.collect::<Vec<_>>();
let common_peer = *known_peers.iter().next().unwrap();
let moved_peers = known_peers.iter().skip(1).copied().collect::<HashSet<_>>();
let known_peers = known_peers.into_iter().collect::<HashSet<_>>();
assert_eq!(moved_peers.len(), 2);
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
0,
0,
false,
known_peers.clone(),
Default::default(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(out_peers.len(), 3);
for peer in &out_peers {
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
assert_eq!(peerset.reserved_peers().len(), 3usize);
for peer in &known_peers {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
let reserved_peers = HashSet::from_iter([common_peer, PeerId::random(), PeerId::random()]);
to_peerset
.unbounded_send(PeersetCommand::SetReservedPeers { peers: reserved_peers.clone() })
.unwrap();
match peerset.next().await {
Some(command) => {
assert_eq!(command.open_peers.len(), 2);
for peer in &command.open_peers {
assert!(reserved_peers.contains(peer));
assert!(peerset.reserved_peers().contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }),
);
}
let peers = command.close_peers;
assert_eq!(peers.len(), 2);
for peer in peers {
assert_ne!(common_peer, peer);
assert_eq!(
peerset.peers().get(&peer),
Some(&PeerState::Closing { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
assert_eq!(peerset.reserved_peers().len(), 3usize);
}
#[tokio::test]
async fn reserved_only_rejects_non_reserved_peers() {
sp_tracing::try_init_simple();
let peerstore_handle = Arc::new(peerstore_handle_test());
let reserved_peers = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);
let connected_peers = Arc::new(AtomicUsize::new(0));
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
3,
3,
true,
reserved_peers.clone(),
connected_peers.clone(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
{
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let out_peers = command.open_peers;
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for outbound_peer in &out_peers {
assert!(reserved_peers.contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
for peer in &reserved_peers {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);
}
let normal_peers: Vec<PeerId> = vec![PeerId::random(), PeerId::random(), PeerId::random()];
{
for peer in &normal_peers {
let result = peerset.report_inbound_substream(*peer);
assert_eq!(result, ValidationResult::Reject);
assert_eq!(peerset.peers().get(peer), Some(&PeerState::Disconnected));
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);
for peer in &normal_peers {
let result = peerset.report_substream_opened(*peer, traits::Direction::Inbound);
assert_eq!(result, OpenResult::Reject);
assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Disconnected));
}
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);
for peer in &normal_peers {
peerset.report_substream_closed(*peer);
assert_eq!(peerset.peers().get(peer), Some(&PeerState::Backoff));
}
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);
}
for (peer, state) in peerset.peers_mut() {
if normal_peers.contains(peer) {
match state {
PeerState::Backoff => *state = PeerState::Disconnected,
state => panic!("invalid state peer={peer:?} state={state:?}"),
}
} else if reserved_peers.contains(peer) {
match state {
PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) } => {},
state => panic!("invalid state peer={peer:?} state={state:?}"),
}
} else {
panic!("invalid peer={peer:?} not present");
}
}
{
to_peerset
.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only: false })
.unwrap();
match peerset.next().await {
Some(command) => {
assert!(command.close_peers.is_empty());
let peers = command.open_peers;
assert_eq!(peers.len(), 3);
for peer in &peers {
assert!(!reserved_peers.contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
assert!(normal_peers.contains(peer));
}
},
event => panic!("invalid event : {event:?}"),
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
for peer in &normal_peers {
let result = peerset.report_inbound_substream(*peer);
assert_eq!(result, ValidationResult::Accept);
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
}
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);
let (success, failure) = normal_peers.split_at(2);
for peer in success {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::No) })
);
}
let failure = failure[0];
peerset.report_substream_open_failure(failure, NotificationError::ChannelClogged);
assert_eq!(peerset.peers().get(&failure), Some(&PeerState::Backoff));
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 2usize);
assert_eq!(connected_peers.load(Ordering::Relaxed), 5usize);
}
}