use std::{fmt, time::Duration};
use multiaddr::Multiaddr;
#[cfg(feature = "metrics")]
use crate::peer_manager::metrics;
use crate::{
net_address::{MultiaddressesWithStats, PeerAddressSource},
peer_manager::{
NodeId,
PeerFeatures,
PeerFlags,
PeerManagerError,
ThisPeerIdentity,
peer::Peer,
peer_id::PeerId,
peer_storage_sql::PeerStorageSql,
},
types::{CommsDatabase, CommsPublicKey, TransportProtocol},
};
#[derive(Clone)]
pub struct PeerManager {
peer_storage_sql: PeerStorageSql,
transport_protocols: Vec<TransportProtocol>,
}
impl PeerManager {
pub fn new(
database: CommsDatabase,
transport_protocols: Vec<TransportProtocol>,
) -> Result<PeerManager, PeerManagerError> {
let peer_storage_sql = PeerStorageSql::new_indexed(database)?;
Ok(Self {
peer_storage_sql,
transport_protocols,
})
}
pub fn this_peer_identity(&self) -> ThisPeerIdentity {
self.peer_storage_sql.this_peer_identity()
}
pub async fn count(&self) -> usize {
self.peer_storage_sql.count()
}
pub async fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
let peer_id = self.peer_storage_sql.add_or_update_peer(peer)?;
#[cfg(feature = "metrics")]
{
let count = self.count().await;
#[allow(clippy::cast_possible_wrap)]
metrics::peer_list_size().set(count as i64);
}
Ok(peer_id)
}
pub async fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
self.peer_storage_sql.soft_delete_peer(node_id)?;
#[cfg(feature = "metrics")]
{
let count = self.count().await;
#[allow(clippy::cast_possible_wrap)]
metrics::peer_list_size().set(count as i64);
}
Ok(())
}
pub async fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.get_peers_by_node_ids(node_ids)
}
pub async fn get_peer_public_keys_by_node_ids(
&self,
node_ids: &[NodeId],
) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
self.peer_storage_sql.get_peer_public_keys_by_node_ids(node_ids)
}
pub async fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.get_banned_peers()
}
pub async fn find_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
self.peer_storage_sql.get_peer_by_node_id(node_id)
}
pub async fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.get_seed_peers()
}
pub async fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
self.peer_storage_sql.find_by_public_key(public_key)
}
pub async fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.find_all_starts_with(partial)
}
pub async fn exists(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
self.peer_storage_sql.exists_public_key(public_key)
}
pub async fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
self.peer_storage_sql.exists_node_id(node_id)
}
pub async fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.all(features)
}
pub async fn get_available_dial_candidates(
&self,
exclude_node_ids: &[NodeId],
limit: Option<usize>,
exclude_failed: bool,
randomize: bool,
) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.get_available_dial_candidates(
exclude_node_ids,
limit,
&self.transport_protocols,
exclude_failed,
randomize,
)
}
pub async fn discovery_syncing(
&self,
n: usize,
excluded_peers: &[NodeId],
features: Option<PeerFeatures>,
external_addresses_only: bool,
) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql
.discovery_syncing(n, excluded_peers, features, external_addresses_only)
}
pub async fn add_or_update_online_peer(
&self,
pubkey: &CommsPublicKey,
node_id: &NodeId,
addresses: &[Multiaddr],
peer_features: &PeerFeatures,
source: &PeerAddressSource,
) -> Result<Peer, PeerManagerError> {
self.peer_storage_sql
.add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)
}
pub async fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
match self.peer_storage_sql.direct_identity_node_id(node_id) {
Ok(peer) => Ok(Some(peer)),
Err(PeerManagerError::PeerNotFound(_)) | Err(PeerManagerError::BannedPeer) => Ok(None),
Err(err) => Err(err),
}
}
pub async fn direct_identity_public_key(
&self,
public_key: &CommsPublicKey,
) -> Result<Option<Peer>, PeerManagerError> {
match self.peer_storage_sql.direct_identity_public_key(public_key) {
Ok(peer) => Ok(Some(peer)),
Err(PeerManagerError::PeerNotFound(_)) | Err(PeerManagerError::BannedPeer) => Ok(None),
Err(err) => Err(err),
}
}
pub async fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql.get_not_banned_or_deleted_peers()
}
pub async fn random_peers(
&self,
n: usize,
excluded: &[NodeId],
flags: Option<PeerFlags>,
) -> Result<Vec<Peer>, PeerManagerError> {
self.peer_storage_sql
.random_peers(n, excluded, flags, &self.transport_protocols)
}
pub async fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
self.peer_storage_sql.unban_peer(node_id)
}
pub async fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
self.peer_storage_sql.unban_all_peers()
}
pub async fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
self.peer_storage_sql.reset_offline_non_wallet_peers()
}
pub async fn ban_peer(
&self,
public_key: &CommsPublicKey,
duration: Duration,
reason: String,
) -> Result<NodeId, PeerManagerError> {
self.peer_storage_sql.ban_peer(public_key, duration, reason)
}
pub async fn ban_peer_by_node_id(
&self,
node_id: &NodeId,
duration: Duration,
reason: String,
) -> Result<NodeId, PeerManagerError> {
self.peer_storage_sql.ban_peer_by_node_id(node_id, duration, reason)
}
pub async fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
self.peer_storage_sql.is_peer_banned(node_id)
}
pub async fn get_peer_features(&self, node_id: &NodeId) -> Result<PeerFeatures, PeerManagerError> {
let peer = self
.find_by_node_id(node_id)
.await?
.ok_or(PeerManagerError::peer_not_found(node_id))?;
Ok(peer.features)
}
pub async fn get_peer_multi_addresses(
&self,
node_id: &NodeId,
) -> Result<MultiaddressesWithStats, PeerManagerError> {
let peer = self
.find_by_node_id(node_id)
.await?
.ok_or(PeerManagerError::peer_not_found(node_id))?;
Ok(peer.addresses)
}
pub async fn get_peers_multi_addresses(
&self,
node_ids: &[NodeId],
) -> Result<Vec<(NodeId, MultiaddressesWithStats)>, PeerManagerError> {
if node_ids.is_empty() {
return Err(PeerManagerError::ProcessError(
"NodeId list cannot be empty".to_string(),
));
}
let peers = self.get_peers_by_node_ids(node_ids).await?;
if peers.is_empty() {
return Err(PeerManagerError::peers_not_found(node_ids));
}
let results = peers.into_iter().map(|p| (p.node_id, p.addresses)).collect::<Vec<_>>();
Ok(results)
}
pub async fn set_peer_metadata(
&self,
node_id: &NodeId,
key: u8,
data: Vec<u8>,
) -> Result<Option<Vec<u8>>, PeerManagerError> {
self.peer_storage_sql.set_peer_metadata(node_id, key, data)
}
}
impl fmt::Debug for PeerManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("PeerManager { peer_storage: ... }")
}
}
#[cfg(test)]
pub fn create_test_peer(ban_flag: bool, features: PeerFeatures) -> Peer {
use std::borrow::BorrowMut;
use rand::{Rng, rngs::OsRng};
use crate::peer_manager::PeerFlags;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut OsRng);
let node_id = NodeId::from_key(&pk);
let mut addresses = Vec::new();
for _i in 1..=rand::thread_rng().gen_range(1..4) {
let n = [
match rand::thread_rng().gen_range(0..3) {
0 => rand::thread_rng().gen_range(1..10), 1 => rand::thread_rng().gen_range(11..127), _ => rand::thread_rng().gen_range(128..255),
},
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(5000..9000),
];
let address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
.parse::<Multiaddr>()
.unwrap();
addresses.push(address);
}
let net_addresses = MultiaddressesWithStats::from_addresses_with_source(
addresses.clone(),
&create_peer_address_source_with_claim(addresses, features),
);
let mut peer = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
features,
Default::default(),
Default::default(),
);
if ban_flag {
peer.ban_for(Duration::from_secs(1000), "".to_string());
}
let good_addresses = peer.addresses.borrow_mut();
let good_address = good_addresses.addresses().first().unwrap().address().clone();
good_addresses.mark_last_seen_now(&good_address);
peer
}
#[cfg(test)]
fn random_onion3_host() -> String {
use rand::distributions::Uniform;
const LEN: usize = 56;
const B32: &[u8; 32] = b"abcdefghijklmnopqrstuvwxyz234567";
let mut rng = rand::thread_rng();
let dist = Uniform::from(0..B32.len());
let mut s = String::with_capacity(LEN);
for _ in 0..LEN {
use rand::Rng;
let idx = rng.sample(dist);
s.push(*B32.get(idx).expect("Index out of bounds") as char);
}
s
}
#[cfg(test)]
pub fn create_test_peer_with_onion_address(ban_flag: bool, features: PeerFeatures) -> Peer {
use std::borrow::BorrowMut;
use rand::{Rng, rngs::OsRng};
use crate::peer_manager::PeerFlags;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut OsRng);
let node_id = NodeId::from_key(&pk);
let mut addresses = Vec::new();
for _i in 1..=rand::thread_rng().gen_range(1..4) {
use std::str::FromStr;
let host = random_onion3_host();
let port = rand::thread_rng().gen_range(1024..=65535);
let addr_str = format!("/onion3/{}:{}", host, port);
let address = Multiaddr::from_str(&addr_str).expect("valid onion3 multiaddr");
addresses.push(address);
}
let net_addresses = MultiaddressesWithStats::from_addresses_with_source(
addresses.clone(),
&create_peer_address_source_with_claim(addresses, features),
);
let mut peer = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
features,
Default::default(),
Default::default(),
);
if ban_flag {
peer.ban_for(Duration::from_secs(1000), "".to_string());
}
let good_addresses = peer.addresses.borrow_mut();
let good_address = good_addresses.addresses().first().unwrap().address().clone();
good_addresses.mark_last_seen_now(&good_address);
peer
}
#[cfg(test)]
pub fn create_test_peer_add_internal_addresses(ban_flag: bool, features: PeerFeatures) -> Peer {
let mut peer = create_test_peer(ban_flag, features);
add_internal_addresses(&mut peer);
peer
}
#[cfg(test)]
pub fn create_test_peer_internal_addresses_only(ban_flag: bool, features: PeerFeatures) -> Peer {
use rand::rngs::OsRng;
use crate::peer_manager::PeerFlags;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut OsRng);
let node_id = NodeId::from_key(&pk);
let mut peer = Peer::new(
pk,
node_id,
MultiaddressesWithStats::default(),
PeerFlags::default(),
features,
Default::default(),
Default::default(),
);
if ban_flag {
peer.ban_for(Duration::from_secs(1000), "".to_string());
}
add_internal_addresses(&mut peer);
peer
}
#[cfg(test)]
fn add_internal_addresses(peer: &mut Peer) {
use rand::{Rng, prelude::SliceRandom};
let mut addresses = Vec::new();
let address_1 = format!(
"/ip4/127.{}.{}.{}/tcp/{}",
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(9000..9100)
)
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_1);
let address_2 = format!("/ip4/0.0.0.0/tcp/{}", rand::thread_rng().gen_range(9100..9200))
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_2);
let address_3 = format!(
"/ip4/10.{}.{}.{}/tcp/{}",
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(9200..9300)
)
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_3);
let address_4 = format!(
"/ip4/172.{}.{}.{}/tcp/{}",
rand::thread_rng().gen_range(16..=31),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(9300..9400)
)
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_4);
let address_5 = format!(
"/ip4/192.168.{}.{}/tcp/{}",
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(0..255),
rand::thread_rng().gen_range(9400..9500)
)
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_5);
let address_6 = format!("/ip6/::1/tcp/{}", rand::thread_rng().gen_range(9500..9600))
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_6);
let address_7 = format!("/ip6/::/tcp/{}", rand::thread_rng().gen_range(9600..9700))
.parse::<Multiaddr>()
.unwrap();
addresses.push(address_7);
addresses.shuffle(&mut rand::thread_rng());
peer.addresses
.add_or_update_addresses(&addresses, &PeerAddressSource::Config);
}
#[cfg(test)]
pub fn create_peer_address_source_with_claim(
addresses: Vec<Multiaddr>,
peer_features: PeerFeatures,
) -> PeerAddressSource {
use chrono::Utc;
use rand::rngs::OsRng;
use tari_crypto::keys::SecretKey;
use crate::{
peer_manager::{IdentitySignature, PeerIdentityClaim},
types::CommsSecretKey,
};
fn create_identity_signature(addresses: &[Multiaddr], peer_features: PeerFeatures) -> IdentitySignature {
let secret = CommsSecretKey::random(&mut OsRng);
let public_key = CommsPublicKey::from_secret_key(&secret);
let updated_at = Utc::now();
let identity = IdentitySignature::sign_new(&secret, peer_features, addresses, updated_at);
assert!(
identity.is_valid(&public_key, peer_features, addresses).unwrap(),
"Signature is not valid"
);
identity
}
PeerAddressSource::FromPeerConnection {
peer_identity_claim: PeerIdentityClaim {
addresses: addresses.clone(),
features: peer_features,
signature: create_identity_signature(&addresses, peer_features),
},
}
}
#[cfg(test)]
mod test {
#![allow(clippy::indexing_slicing)]
use chrono::{DateTime, Utc};
use tari_common_sqlite::connection::DbConnection;
use super::*;
use crate::peer_manager::database::{MIGRATIONS, PeerDatabaseSql};
fn create_peer_manager() -> PeerManager {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
PeerManager::new(peers_db, TransportProtocol::get_all()).unwrap()
}
#[tokio::test]
#[allow(clippy::too_many_lines)]
async fn test_get_broadcast_identities() {
let peer_manager = create_peer_manager();
let mut test_peers = vec![create_test_peer(true, PeerFeatures::COMMUNICATION_NODE)];
assert!(
peer_manager
.add_or_update_peer(test_peers[test_peers.len() - 1].clone())
.await
.is_ok()
);
for _i in 0..18 {
test_peers.push(create_test_peer(false, PeerFeatures::COMMUNICATION_NODE));
assert!(
peer_manager
.add_or_update_peer(test_peers[test_peers.len() - 1].clone())
.await
.is_ok()
);
}
test_peers.push(create_test_peer(true, PeerFeatures::COMMUNICATION_NODE));
assert!(
peer_manager
.add_or_update_peer(test_peers[test_peers.len() - 1].clone())
.await
.is_ok()
);
let selected_peers = peer_manager
.direct_identity_node_id(&test_peers[2].node_id)
.await
.unwrap()
.unwrap();
assert_eq!(selected_peers.node_id, test_peers[2].node_id);
assert_eq!(selected_peers.public_key, test_peers[2].public_key);
let unmanaged_peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
assert!(
peer_manager
.direct_identity_node_id(&unmanaged_peer.node_id)
.await
.unwrap()
.is_none()
);
let selected_peers = peer_manager.get_not_banned_or_deleted_peers().await.unwrap();
assert_eq!(selected_peers.len(), 18);
for peer_identity in &selected_peers {
assert!(
!peer_manager
.find_by_node_id(&peer_identity.node_id)
.await
.unwrap()
.unwrap()
.is_banned(),
);
}
let identities1 = peer_manager.random_peers(10, &[], None).await.unwrap();
let identities2 = peer_manager.random_peers(10, &[], None).await.unwrap();
assert_ne!(identities1, identities2);
}
#[tokio::test]
async fn test_add_or_update_online_peer() {
let peer_manager = create_peer_manager();
let peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
peer_manager.add_or_update_peer(peer.clone()).await.unwrap();
let peer = peer_manager
.add_or_update_online_peer(
&peer.public_key,
&peer.node_id,
&[],
&peer.features,
&PeerAddressSource::Config,
)
.await
.unwrap();
assert!(!peer.is_offline());
}
async fn validate_claim_bump_by_newer(
peer_manager: &PeerManager,
update_peer: &Peer,
previous_claim_time: Option<DateTime<Utc>>,
expected_count: usize,
) -> DateTime<Utc> {
let peer_from_db = peer_manager
.find_by_node_id(&update_peer.node_id)
.await
.unwrap()
.unwrap();
let newest_time = peer_from_db.addresses.newest_claim_updated_at().unwrap();
if let Some(prev_time) = previous_claim_time {
assert!(
newest_time > prev_time,
"New claim time was not newer than previous claim time"
);
}
for addr in peer_from_db.addresses.addresses() {
let claim_time = match addr.source() {
PeerAddressSource::FromPeerConnection { peer_identity_claim } => {
peer_identity_claim.signature.updated_at()
},
_ => panic!("Expected FromPeerConnection source for address: {}", addr.address()),
};
assert_eq!(
claim_time, newest_time,
"Address claim time inconsistent among addresses"
);
}
assert_eq!(peer_manager.count().await, expected_count, "Peer count mismatch");
let mut expected_addresses = update_peer
.addresses
.addresses()
.iter()
.map(|a| a.address().clone())
.collect::<Vec<_>>();
let mut addresses_from_db = peer_from_db
.addresses
.addresses()
.iter()
.map(|a| a.address().clone())
.collect::<Vec<_>>();
expected_addresses.sort();
addresses_from_db.sort();
assert_eq!(expected_addresses, addresses_from_db);
newest_time
}
#[tokio::test]
async fn it_correctly_merges_old_and_new_address_claims() {
let peer_manager = create_peer_manager();
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
peer_manager.add_or_update_peer(peer.clone()).await.unwrap();
let claim_1_time = validate_claim_bump_by_newer(&peer_manager, &peer, None, 1).await;
tokio::time::sleep(Duration::from_millis(150)).await; let peer_addresses = peer
.addresses
.addresses()
.iter()
.map(|a| a.address().clone())
.collect::<Vec<_>>();
let peer_address_source = create_peer_address_source_with_claim(peer_addresses.clone(), peer.features);
peer.addresses
.add_or_update_addresses(&peer_addresses, &peer_address_source);
peer_manager.add_or_update_peer(peer.clone()).await.unwrap();
let claim_2_time = validate_claim_bump_by_newer(&peer_manager, &peer, Some(claim_1_time), 1).await;
tokio::time::sleep(Duration::from_millis(150)).await; let mut update_peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
update_peer.node_id = peer.node_id.clone();
update_peer.public_key = peer.public_key.clone();
peer_manager.add_or_update_peer(update_peer.clone()).await.unwrap();
let _claim_3_time = validate_claim_bump_by_newer(&peer_manager, &update_peer, Some(claim_2_time), 1).await;
peer_manager.add_or_update_peer(peer.clone()).await.unwrap();
let _claim_3_time = validate_claim_bump_by_newer(&peer_manager, &update_peer, Some(claim_2_time), 1).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_concurrent_add_or_update_and_get_random_peers() {
let peer_manager = create_peer_manager();
let num_peers = 75;
let num_write_tasks = 20;
let num_read_tasks = 1500;
let n = 100;
let add_tasks: Vec<_> = (0..num_write_tasks)
.map(|_| {
let peer_manager = peer_manager.clone();
tokio::spawn(async move {
let mut peers_to_update_last_seen = Vec::new();
let mut peers_to_set_metadata = Vec::new();
for i in 0..num_peers {
let peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
if i % 7 == 0 {
peers_to_update_last_seen.push(peer.clone());
}
if i % 11 == 0 {
peers_to_set_metadata.push(peer.clone());
}
peers_to_update_last_seen.push(peer.clone());
peer_manager.add_or_update_peer(peer).await.unwrap();
tokio::time::sleep(Duration::from_micros(rand::random::<u64>() % 100)).await;
}
for peer in &mut peers_to_update_last_seen {
let addresses = peer.addresses.addresses().to_vec();
peer.addresses.mark_last_seen_now(addresses[0].address());
peer_manager.add_or_update_peer(peer.clone()).await.unwrap();
tokio::time::sleep(Duration::from_micros(rand::random::<u64>() % 100)).await;
}
for (key, peer) in peers_to_set_metadata.iter().enumerate() {
peer_manager
.set_peer_metadata(
&peer.node_id,
u8::try_from(key % usize::from(u8::MAX)).unwrap_or_default(),
vec![1, 2, 3],
)
.await
.unwrap();
tokio::time::sleep(Duration::from_micros(rand::random::<u64>() % 100)).await;
}
Ok::<_, PeerManagerError>(())
})
})
.collect();
let get_tasks: Vec<_> = (0..num_read_tasks)
.map(|_| {
let peer_manager = peer_manager.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_micros(rand::random::<u64>() % 100)).await;
let _random_peers = peer_manager.random_peers(n, &[], None).await.unwrap();
tokio::time::sleep(Duration::from_micros(rand::random::<u64>() % 100)).await;
let _total_peers = peer_manager.count().await;
Ok::<_, PeerManagerError>(())
})
})
.collect();
let all_tasks = add_tasks.into_iter().chain(get_tasks);
for (i, task) in all_tasks.enumerate() {
match task.await {
Ok(Ok(_)) => { },
Ok(Err(e)) => panic!("Task {i} failed with PeerManagerError: {e:?}"),
Err(e) => panic!("Task {i} panicked: {e:?}"),
}
}
tokio::time::sleep(Duration::from_micros(rand::random::<u64>() % 100)).await;
let random_peers = peer_manager.random_peers(n, &[], None).await.unwrap();
let total_peers = peer_manager.count().await;
assert_eq!(total_peers, num_peers * num_write_tasks);
assert!(random_peers.len() <= n);
}
}