use std::{cmp::min, time::Duration};
use log::*;
use multiaddr::Multiaddr;
use crate::{
net_address::PeerAddressSource,
peer_manager::{
NodeId,
PeerFeatures,
PeerFlags,
PeerManagerError,
database::{PeerDatabaseSql, ThisPeerIdentity},
peer::Peer,
peer_id::PeerId,
},
types::{CommsDatabase, CommsPublicKey, TransportProtocol},
};
const LOG_TARGET: &str = "comms::peer_manager::peer_storage_sql";
const PEER_MANAGER_SYNC_PEERS: usize = 100;
pub const STALE_PEER_THRESHOLD_DURATION: Duration = Duration::from_secs(5 * 24 * 60 * 60 / 2);
#[derive(Clone)]
pub struct PeerStorageSql {
peer_db: PeerDatabaseSql,
}
impl PeerStorageSql {
pub fn new_indexed(database: PeerDatabaseSql) -> Result<PeerStorageSql, PeerManagerError> {
trace!(
target: LOG_TARGET,
"Peer storage is initialized. {} total entries.",
database.size(),
);
Ok(PeerStorageSql { peer_db: database })
}
pub fn this_peer_identity(&self) -> ThisPeerIdentity {
self.peer_db.this_peer_identity()
}
pub fn count(&self) -> usize {
self.peer_db.size()
}
pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, PeerManagerError> {
Ok(self.peer_db.add_or_update_peer(peer)?)
}
pub fn add_or_update_online_peer(
&self,
pubkey: &CommsPublicKey,
node_id: &NodeId,
addresses: &[Multiaddr],
peer_features: &PeerFeatures,
source: &PeerAddressSource,
) -> Result<Peer, PeerManagerError> {
Ok(self
.peer_db
.add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
}
pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
self.peer_db.soft_delete_peer(node_id)?;
Ok(())
}
pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
Ok(self.peer_db.get_peer_by_node_id(node_id)?)
}
pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self.peer_db.get_peers_by_node_ids(node_ids)?)
}
pub fn get_peer_public_keys_by_node_ids(
&self,
node_ids: &[NodeId],
) -> Result<Vec<CommsPublicKey>, PeerManagerError> {
Ok(self.peer_db.get_peer_public_keys_by_node_ids(node_ids)?)
}
pub fn get_banned_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self.peer_db.get_banned_peers()?)
}
pub fn find_all_starts_with(&self, partial: &[u8]) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self.peer_db.find_all_peers_match_partial_key(partial)?)
}
pub fn find_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, PeerManagerError> {
Ok(self.peer_db.get_peer_by_public_key(public_key)?)
}
pub fn exists_public_key(&self, public_key: &CommsPublicKey) -> Result<bool, PeerManagerError> {
if let Ok(val) = self.peer_db.peer_exists_by_public_key(public_key) {
Ok(val.is_some())
} else {
Ok(false)
}
}
pub fn exists_node_id(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
if let Ok(val) = self.peer_db.peer_exists_by_node_id(node_id) {
Ok(val.is_some())
} else {
Ok(false)
}
}
pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
let peer = self
.get_peer_by_node_id(node_id)?
.ok_or(PeerManagerError::peer_not_found(node_id))?;
if peer.is_banned() {
Err(PeerManagerError::BannedPeer)
} else {
Ok(peer)
}
}
pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
let peer = self
.find_by_public_key(public_key)?
.ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
if peer.is_banned() {
Err(PeerManagerError::BannedPeer)
} else {
Ok(peer)
}
}
pub fn all(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self.peer_db.get_all_peers(features)?)
}
pub fn discovery_syncing(
&self,
mut n: usize,
excluded_peers: &[NodeId],
features: Option<PeerFeatures>,
external_addresses_only: bool,
) -> Result<Vec<Peer>, PeerManagerError> {
if n == 0 {
n = PEER_MANAGER_SYNC_PEERS;
} else {
n = min(n, PEER_MANAGER_SYNC_PEERS);
}
Ok(self.peer_db.get_n_random_active_peers(
n,
excluded_peers,
features,
None,
Some(STALE_PEER_THRESHOLD_DURATION),
external_addresses_only,
&[],
)?)
}
pub fn get_not_banned_or_deleted_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self
.peer_db
.get_n_not_banned_or_deleted_peers(PEER_MANAGER_SYNC_PEERS)?)
}
pub fn get_available_dial_candidates(
&self,
exclude_node_ids: &[NodeId],
limit: Option<usize>,
transport_protocols: &[TransportProtocol],
exclude_failed: bool,
randomize: bool,
) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self.peer_db.get_available_dial_candidates(
exclude_node_ids,
limit,
transport_protocols,
exclude_failed,
randomize,
)?)
}
pub fn get_seed_peers(&self) -> Result<Vec<Peer>, PeerManagerError> {
let seed_peers = self.peer_db.get_seed_peers()?;
trace!(
target: LOG_TARGET,
"Get seed peers: {:?}",
seed_peers.iter().map(|p| p.node_id.short_str()).collect::<Vec<_>>(),
);
Ok(seed_peers)
}
pub fn random_peers(
&self,
n: usize,
exclude_peers: &[NodeId],
flags: Option<PeerFlags>,
transport_protocols: &[TransportProtocol],
) -> Result<Vec<Peer>, PeerManagerError> {
Ok(self
.peer_db
.get_n_random_peers(n, exclude_peers, flags, transport_protocols)?)
}
pub fn unban_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
let _node_id = self.peer_db.reset_banned(node_id)?;
Ok(())
}
pub fn unban_all_peers(&self) -> Result<usize, PeerManagerError> {
let number_unbanned = self.peer_db.reset_all_banned()?;
Ok(number_unbanned)
}
pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, PeerManagerError> {
let number_offline = self.peer_db.reset_offline_non_wallet_peers()?;
Ok(number_offline)
}
pub fn ban_peer(
&self,
public_key: &CommsPublicKey,
duration: Duration,
reason: String,
) -> Result<NodeId, PeerManagerError> {
let node_id = NodeId::from_key(public_key);
self.peer_db
.set_banned(&node_id, duration, reason)?
.ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
}
pub fn ban_peer_by_node_id(
&self,
node_id: &NodeId,
duration: Duration,
reason: String,
) -> Result<NodeId, PeerManagerError> {
self.peer_db
.set_banned(node_id, duration, reason)?
.ok_or(PeerManagerError::peer_not_found(node_id))
}
pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
let peer = self
.get_peer_by_node_id(node_id)?
.ok_or(PeerManagerError::peer_not_found(node_id))?;
Ok(peer.is_banned())
}
pub fn set_peer_metadata(
&self,
node_id: &NodeId,
key: u8,
data: Vec<u8>,
) -> Result<Option<Vec<u8>>, PeerManagerError> {
Ok(self.peer_db.set_metadata(node_id, key, data)?)
}
}
#[allow(clippy::from_over_into)]
impl Into<CommsDatabase> for PeerStorageSql {
fn into(self) -> CommsDatabase {
self.peer_db
}
}
#[cfg(test)]
mod test {
#![allow(clippy::indexing_slicing)]
use std::{borrow::BorrowMut, iter::repeat_with};
use chrono::{DateTime, Utc};
use multiaddr::Multiaddr;
use rand::Rng;
use tari_common_sqlite::connection::DbConnection;
use super::*;
use crate::{
net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
peer_manager::{create_test_peer_add_internal_addresses, database::MIGRATIONS, peer::PeerFlags},
};
fn get_peer_db_sql_test_db() -> Result<PeerDatabaseSql, PeerManagerError> {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
Ok(PeerDatabaseSql::new(
db_connection,
&create_test_peer(PeerFeatures::COMMUNICATION_NODE, false),
)?)
}
fn get_peer_storage_sql_test_db() -> Result<PeerStorageSql, PeerManagerError> {
PeerStorageSql::new_indexed(get_peer_db_sql_test_db()?)
}
#[test]
fn test_restore() {
let mut rng = rand::rngs::OsRng;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
let mut net_addresses =
MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
net_addresses.add_or_update_addresses(&[net_address2], &PeerAddressSource::Config);
net_addresses.add_or_update_addresses(&[net_address3], &PeerAddressSource::Config);
let peer1 = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
let net_addresses =
MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
let peer2: Peer = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
let mut net_addresses =
MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
net_addresses.add_or_update_addresses(&[net_address6], &PeerAddressSource::Config);
let peer3 = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
let mut db = Some(get_peer_db_sql_test_db().unwrap());
{
let peer_storage = db.take().unwrap();
assert!(peer_storage.add_or_update_peer(peer1.clone()).is_ok());
assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
assert_eq!(peer_storage.size(), 3);
assert!(peer_storage.get_peer_by_public_key(&peer1.public_key).is_ok());
assert!(peer_storage.get_peer_by_public_key(&peer2.public_key).is_ok());
assert!(peer_storage.get_peer_by_public_key(&peer3.public_key).is_ok());
db = Some(peer_storage);
}
let peer_storage = PeerStorageSql::new_indexed(db.take().unwrap()).unwrap();
assert_eq!(peer_storage.peer_db.size(), 3);
assert!(peer_storage.find_by_public_key(&peer1.public_key).is_ok());
assert!(peer_storage.find_by_public_key(&peer2.public_key).is_ok());
assert!(peer_storage.find_by_public_key(&peer3.public_key).is_ok());
}
#[allow(clippy::too_many_lines)]
#[test]
fn test_add_delete_find_peer() {
let peer_storage = get_peer_storage_sql_test_db().unwrap();
let mut rng = rand::rngs::OsRng;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let net_address1 = "/ip4/1.2.3.4/tcp/8000".parse::<Multiaddr>().unwrap();
let net_address2 = "/ip4/5.6.7.8/tcp/8000".parse::<Multiaddr>().unwrap();
let net_address3 = "/ip4/5.6.7.8/tcp/7000".parse::<Multiaddr>().unwrap();
let mut net_addresses =
MultiaddressesWithStats::from_addresses_with_source(vec![net_address1], &PeerAddressSource::Config);
net_addresses.add_or_update_addresses(&[net_address2], &PeerAddressSource::Config);
net_addresses.add_or_update_addresses(&[net_address3], &PeerAddressSource::Config);
let peer1 = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let net_address4 = "/ip4/9.10.11.12/tcp/7000".parse::<Multiaddr>().unwrap();
let net_addresses =
MultiaddressesWithStats::from_addresses_with_source(vec![net_address4], &PeerAddressSource::Config);
let peer2: Peer = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let net_address5 = "/ip4/13.14.15.16/tcp/6000".parse::<Multiaddr>().unwrap();
let net_address6 = "/ip4/17.18.19.20/tcp/8000".parse::<Multiaddr>().unwrap();
let mut net_addresses =
MultiaddressesWithStats::from_addresses_with_source(vec![net_address5], &PeerAddressSource::Config);
net_addresses.add_or_update_addresses(&[net_address6], &PeerAddressSource::Config);
let peer3 = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
peer_storage.add_or_update_peer(peer1.clone()).unwrap(); assert!(peer_storage.add_or_update_peer(peer2.clone()).is_ok());
assert!(peer_storage.add_or_update_peer(peer3.clone()).is_ok());
assert_eq!(peer_storage.peer_db.size(), 3);
assert_eq!(
peer_storage
.find_by_public_key(&peer1.public_key)
.unwrap()
.unwrap()
.public_key,
peer1.public_key
);
assert_eq!(
peer_storage
.find_by_public_key(&peer2.public_key)
.unwrap()
.unwrap()
.public_key,
peer2.public_key
);
assert_eq!(
peer_storage
.find_by_public_key(&peer3.public_key)
.unwrap()
.unwrap()
.public_key,
peer3.public_key
);
assert_eq!(
peer_storage
.get_peer_by_node_id(&peer1.node_id)
.unwrap()
.unwrap()
.node_id,
peer1.node_id
);
assert_eq!(
peer_storage
.get_peer_by_node_id(&peer2.node_id)
.unwrap()
.unwrap()
.node_id,
peer2.node_id
);
assert_eq!(
peer_storage
.get_peer_by_node_id(&peer3.node_id)
.unwrap()
.unwrap()
.node_id,
peer3.node_id
);
peer_storage.find_by_public_key(&peer1.public_key).unwrap().unwrap();
peer_storage.find_by_public_key(&peer2.public_key).unwrap().unwrap();
peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
assert_eq!(peer_storage.peer_db.size(), 3);
assert_eq!(
peer_storage
.find_by_public_key(&peer1.public_key)
.unwrap()
.unwrap()
.public_key,
peer1.public_key
);
assert_eq!(
peer_storage
.find_by_public_key(&peer2.public_key)
.unwrap()
.unwrap()
.public_key,
peer2.public_key
);
assert!(
peer_storage
.find_by_public_key(&peer3.public_key)
.unwrap()
.unwrap()
.deleted_at
.is_some()
);
assert_eq!(
peer_storage
.get_peer_by_node_id(&peer1.node_id)
.unwrap()
.unwrap()
.node_id,
peer1.node_id
);
assert_eq!(
peer_storage
.get_peer_by_node_id(&peer2.node_id)
.unwrap()
.unwrap()
.node_id,
peer2.node_id
);
assert!(
peer_storage
.get_peer_by_node_id(&peer3.node_id)
.unwrap()
.unwrap()
.deleted_at
.is_some()
);
}
fn create_test_peer(features: PeerFeatures, ban: bool) -> Peer {
let mut rng = rand::rngs::OsRng;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let mut net_addresses = MultiaddressesWithStats::from_addresses_with_source(vec![], &PeerAddressSource::Config);
for _i in 1..=rand::thread_rng().gen_range(1..4) {
let n = [
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(1..255),
rand::thread_rng().gen_range(5000..9000),
];
let net_address = format!("/ip4/{}.{}.{}.{}/tcp/{}", n[0], n[1], n[2], n[3], n[4])
.parse::<Multiaddr>()
.unwrap();
net_addresses.add_or_update_addresses(&[net_address], &PeerAddressSource::Config);
}
let mut peer = Peer::new(
pk,
node_id,
net_addresses,
PeerFlags::default(),
features,
Default::default(),
Default::default(),
);
if ban {
peer.ban_for(Duration::from_secs(600), "".to_string());
}
peer
}
#[test]
fn get_just_seeds() {
let peer_storage = get_peer_storage_sql_test_db().unwrap();
let seeds = repeat_with(|| {
let mut peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
peer.add_flags(PeerFlags::SEED);
peer
})
.take(5)
.collect::<Vec<_>>();
for p in &seeds {
peer_storage.add_or_update_peer(p.clone()).unwrap();
}
let nodes = repeat_with(|| create_test_peer(PeerFeatures::COMMUNICATION_NODE, false))
.take(5)
.collect::<Vec<_>>();
for p in &nodes {
peer_storage.add_or_update_peer(p.clone()).unwrap();
}
let retrieved_seeds = peer_storage.get_seed_peers().unwrap();
assert_eq!(retrieved_seeds.len(), seeds.len());
for seed in seeds {
assert!(retrieved_seeds.iter().any(|p| p.node_id == seed.node_id));
}
}
#[test]
fn discovery_syncing_returns_correct_peers() {
let peer_storage = get_peer_storage_sql_test_db().unwrap();
#[allow(clippy::cast_possible_wrap)] let above_the_threshold = Utc::now().timestamp() - (STALE_PEER_THRESHOLD_DURATION.as_secs() + 60) as i64;
let never_seen_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
let banned_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, true);
let mut not_active_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
let address = not_active_peer.addresses.best().unwrap();
let mut address = MultiaddrWithStats::new(address.address().clone(), PeerAddressSource::Config);
address.mark_last_attempted(DateTime::from_timestamp(above_the_threshold, 0).unwrap().naive_utc());
not_active_peer
.addresses
.merge(&MultiaddressesWithStats::from(vec![address]));
let mut good_peer = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
let good_addresses = good_peer.addresses.borrow_mut();
let good_address = good_addresses.addresses()[0].address().clone();
good_addresses.mark_last_seen_now(&good_address);
let mut good_seed = create_test_peer(PeerFeatures::COMMUNICATION_NODE, false);
good_seed.flags = PeerFlags::SEED;
let good_addresses = good_seed.addresses.borrow_mut();
let good_address = good_addresses.addresses()[0].address().clone();
good_addresses.mark_last_seen_now(&good_address);
assert!(peer_storage.add_or_update_peer(never_seen_peer).is_ok());
assert!(peer_storage.add_or_update_peer(not_active_peer).is_ok());
assert!(peer_storage.add_or_update_peer(banned_peer).is_ok());
assert!(peer_storage.add_or_update_peer(good_peer).is_ok());
assert!(peer_storage.add_or_update_peer(good_seed.clone()).is_ok());
assert_eq!(peer_storage.all(None).unwrap().len(), 5);
assert_eq!(
peer_storage
.discovery_syncing(100, &[good_seed.node_id], Some(PeerFeatures::COMMUNICATION_NODE), false,)
.unwrap()
.len(),
1
);
assert_eq!(
peer_storage
.discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false)
.unwrap()
.len(),
2
);
}
#[test]
fn discovery_syncing_peers_with_external_addresses_only() {
let peer_storage = get_peer_storage_sql_test_db().unwrap();
let nodes = repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_NODE))
.take(5)
.collect::<Vec<_>>();
let wallets =
repeat_with(|| create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_CLIENT))
.take(5)
.collect::<Vec<_>>();
for peer in nodes.iter().chain(wallets.iter()) {
peer_storage.add_or_update_peer(peer.clone()).unwrap();
}
let nodes_all_addresses = peer_storage
.discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), false)
.unwrap();
assert!(
nodes_all_addresses
.iter()
.all(|p| { p.addresses.addresses().iter().any(|addr| addr.is_external()) })
);
assert!(
nodes_all_addresses
.iter()
.all(|p| { p.addresses.addresses().iter().any(|addr| !addr.is_external()) })
);
let nodes_external_addresses_only = peer_storage
.discovery_syncing(100, &[], Some(PeerFeatures::COMMUNICATION_NODE), true)
.unwrap();
assert!(
nodes_external_addresses_only
.iter()
.all(|p| { p.addresses.addresses().iter().all(|addr| addr.is_external()) })
);
}
}