use crate::{
litep2p::{
peerstore::Peerstore,
shim::notification::peerset::{OpenResult, Peerset, PeersetCommand},
},
service::traits::{Direction, PeerStore, ValidationResult},
ProtocolName,
};
use futures::{FutureExt, StreamExt};
use litep2p::protocol::notification::NotificationError;
use rand::{
distributions::{Distribution, Uniform, WeightedIndex},
seq::IteratorRandom,
};
use sc_network_common::types::ReputationChange;
use sc_network_types::PeerId;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
#[tokio::test]
#[cfg(debug_assertions)]
async fn run() {
sp_tracing::try_init_simple();
for _ in 0..50 {
test_once().await;
}
}
#[cfg(debug_assertions)]
async fn test_once() {
let mut rng = rand::thread_rng();
let mut known_peers = HashSet::<PeerId>::new();
let mut reserved_peers = HashSet::<PeerId>::new();
let mut reserved_only = Uniform::new_inclusive(0, 10).sample(&mut rng) == 0;
let bootnodes = (0..Uniform::new_inclusive(0, 4).sample(&mut rng))
.map(|_| {
let id = PeerId::random();
known_peers.insert(id);
id
})
.collect();
let peerstore = Peerstore::new(bootnodes, None);
let peer_store_handle = peerstore.handle();
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
Uniform::new_inclusive(0, 25).sample(&mut rng),
Uniform::new_inclusive(0, 25).sample(&mut rng),
reserved_only,
(0..Uniform::new_inclusive(0, 2).sample(&mut rng))
.map(|_| {
let id = PeerId::random();
known_peers.insert(id);
reserved_peers.insert(id);
id
})
.collect(),
Default::default(),
Arc::clone(&peer_store_handle),
);
tokio::spawn(peerstore.run());
let mut opening = HashMap::<PeerId, Direction>::new();
let mut open = HashMap::<PeerId, Direction>::new();
let mut closing = HashSet::<PeerId>::new();
let mut closed = HashSet::<PeerId>::new();
let _ = tokio::task::spawn_blocking(move || {
let mut rng = rand::thread_rng();
for _ in 0..2500 {
let action_weights =
[300, 110, 110, 110, 110, 90, 70, 30, 110, 110, 110, 110, 20, 110, 50, 110];
match WeightedIndex::new(&action_weights).unwrap().sample(&mut rng) {
0 => match peerset.next().now_or_never() {
Some(Some(command)) => {
for peer in command.open_peers {
opening.insert(peer, Direction::Outbound);
closed.remove(&peer);
assert!(!closing.contains(&peer));
assert!(!open.contains_key(&peer));
}
for peer in command.close_peers {
assert!(closing.insert(peer));
assert!(open.remove(&peer).is_some());
assert!(!opening.contains_key(&peer));
}
},
Some(None) => panic!("peerset exited"),
None => {},
},
1 => {
let new_peer = PeerId::random();
peer_store_handle.add_known_peer(new_peer);
match peerset.report_inbound_substream(new_peer) {
ValidationResult::Accept => {
opening.insert(new_peer, Direction::Inbound);
},
ValidationResult::Reject => {},
}
},
2 => {
if let Some(peer) = opening.keys().choose(&mut rng).copied() {
let direction = opening.remove(&peer).unwrap();
match peerset.report_substream_opened(peer, direction) {
OpenResult::Accept { .. } => {
assert!(open.insert(peer, direction).is_none());
},
OpenResult::Reject => {
assert!(closing.insert(peer));
},
}
}
},
3 => {
if let Some(peer) = opening.keys().choose(&mut rng).copied() {
let _ = opening.remove(&peer).unwrap();
peerset.report_substream_open_failure(peer, NotificationError::Rejected);
}
},
4 => {
if let Some(peer) = open.keys().choose(&mut rng).copied() {
let _ = open.remove(&peer).unwrap();
peerset.report_substream_closed(peer);
assert!(closed.insert(peer));
}
},
5 => {
if let Some(peer) = closing.iter().choose(&mut rng).copied() {
assert!(closing.remove(&peer));
assert!(closed.insert(peer));
peerset.report_substream_closed(peer);
}
},
6 => {
if let Some(peer) = open.keys().choose(&mut rng).copied() {
to_peerset.unbounded_send(PeersetCommand::DisconnectPeer { peer }).unwrap();
}
},
7 => {
if let Some(peer) = known_peers.iter().choose(&mut rng).copied() {
peer_store_handle.report_peer(peer, ReputationChange::new_fatal(""));
}
},
8 => {
let outbound_peers = opening
.iter()
.filter_map(|(peer, direction)| {
std::matches!(direction, Direction::Outbound).then_some(*peer)
})
.collect::<HashSet<_>>();
if let Some(peer) = outbound_peers.iter().choose(&mut rng).copied() {
match peerset.report_inbound_substream(peer) {
ValidationResult::Accept => {
opening.insert(peer, Direction::Inbound);
},
ValidationResult::Reject => {},
}
}
},
9 => {
let num_open = Uniform::new_inclusive(0, open.len()).sample(&mut rng);
let num_opening = Uniform::new_inclusive(0, opening.len()).sample(&mut rng);
let num_closing = Uniform::new_inclusive(0, closing.len()).sample(&mut rng);
let num_closed = Uniform::new_inclusive(0, closed.len()).sample(&mut rng);
let peers = open
.keys()
.copied()
.choose_multiple(&mut rng, num_open)
.into_iter()
.chain(
opening
.keys()
.copied()
.choose_multiple(&mut rng, num_opening)
.into_iter(),
)
.chain(
closing
.iter()
.copied()
.choose_multiple(&mut rng, num_closing)
.into_iter(),
)
.chain(
closed
.iter()
.copied()
.choose_multiple(&mut rng, num_closed)
.into_iter(),
)
.chain((0..5).map(|_| {
let peer = PeerId::random();
known_peers.insert(peer);
peer_store_handle.add_known_peer(peer);
peer
}))
.filter(|peer| !reserved_peers.contains(peer))
.collect::<HashSet<_>>();
reserved_peers.extend(peers.clone().into_iter());
to_peerset.unbounded_send(PeersetCommand::SetReservedPeers { peers }).unwrap();
},
10 => {
let num_open = Uniform::new_inclusive(0, open.len()).sample(&mut rng);
let num_opening = Uniform::new_inclusive(0, opening.len()).sample(&mut rng);
let num_closing = Uniform::new_inclusive(0, closing.len()).sample(&mut rng);
let num_closed = Uniform::new_inclusive(0, closed.len()).sample(&mut rng);
let peers = open
.keys()
.copied()
.choose_multiple(&mut rng, num_open)
.into_iter()
.chain(
opening
.keys()
.copied()
.choose_multiple(&mut rng, num_opening)
.into_iter(),
)
.chain(
closing
.iter()
.copied()
.choose_multiple(&mut rng, num_closing)
.into_iter(),
)
.chain(
closed
.iter()
.copied()
.choose_multiple(&mut rng, num_closed)
.into_iter(),
)
.chain((0..5).map(|_| {
let peer = PeerId::random();
known_peers.insert(peer);
peer_store_handle.add_known_peer(peer);
peer
}))
.filter(|peer| !reserved_peers.contains(peer))
.collect::<HashSet<_>>();
reserved_peers.extend(peers.clone().into_iter());
to_peerset.unbounded_send(PeersetCommand::AddReservedPeers { peers }).unwrap();
},
11 => {
let num_to_remove =
Uniform::new_inclusive(0, reserved_peers.len()).sample(&mut rng);
let peers = reserved_peers
.iter()
.copied()
.choose_multiple(&mut rng, num_to_remove)
.into_iter()
.collect::<HashSet<_>>();
peers.iter().for_each(|peer| {
assert!(reserved_peers.remove(peer));
});
to_peerset
.unbounded_send(PeersetCommand::RemoveReservedPeers { peers })
.unwrap();
},
12 => {
reserved_only = !reserved_only;
let _ = to_peerset
.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
},
13 => {
let new_peer = PeerId::random();
known_peers.insert(new_peer);
peer_store_handle.add_known_peer(new_peer);
},
14 => {
let inbound_peers = opening
.iter()
.filter_map(|(peer, direction)| {
std::matches!(direction, Direction::Inbound).then_some(*peer)
})
.collect::<HashSet<_>>();
if let Some(peer) = inbound_peers.iter().choose(&mut rng).copied() {
peerset.report_substream_rejected(peer);
opening.remove(&peer);
}
},
15 => {
if let Some(peer) = closed.iter().choose(&mut rng).copied() {
match peerset.report_inbound_substream(peer) {
ValidationResult::Accept => {
assert!(closed.remove(&peer));
opening.insert(peer, Direction::Inbound);
},
ValidationResult::Reject => {},
}
}
},
_ => unreachable!(),
}
}
})
.await
.unwrap();
}