use std::{cmp::min, collections::HashMap, str::FromStr, time::Duration};
use bytes::Bytes;
use chrono::{NaiveDateTime, TimeDelta};
use diesel::{
self,
ExpressionMethods,
QueryDsl,
RunQueryDsl,
SqliteConnection,
dsl::not,
prelude::*,
r2d2::{ConnectionManager, PooledConnection},
};
use diesel_migrations::{EmbeddedMigrations, embed_migrations};
use log::{trace, warn};
use multiaddr::Multiaddr;
use tari_common_sqlite::{connection::DbConnection, error::StorageError};
use tari_utilities::{hex, hex::Hex};
use crate::{
net_address::{MultiaddrWithStats, MultiaddressesWithStats, PeerAddressSource},
peer_manager::{
NodeId,
Peer,
PeerFeatures,
PeerFlags,
PeerId,
generate_peer_id_as_i64,
peer_id::peer_id_from_i64,
storage::schema::{multi_addresses, node_identity, peers},
},
protocol::ProtocolId,
types::{CommsPublicKey, TransportProtocol},
utils::datetime::safe_future_datetime_from_duration,
};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./src/peer_manager/storage/migrations");
const LOG_TARGET: &str = "comms::peer_manager::storage::db";
#[derive(Clone)]
pub struct ThisPeerIdentity {
pub public_key: CommsPublicKey,
pub node_id: NodeId,
pub features: PeerFeatures,
}
#[derive(Clone)]
pub struct PeerDatabaseSql {
connection: DbConnection,
this_peer_identity: ThisPeerIdentity,
}
impl PeerDatabaseSql {
pub fn new(connection: DbConnection, this_peer: &Peer) -> Result<Self, StorageError> {
let instance = Self {
connection,
this_peer_identity: ThisPeerIdentity {
public_key: this_peer.public_key.clone(),
node_id: this_peer.node_id.clone(),
features: this_peer.features,
},
};
PeerDatabaseSql::add_this_peer_node_identity_to_db(&instance)?;
Ok(instance)
}
pub fn this_peer_identity(&self) -> ThisPeerIdentity {
self.this_peer_identity.clone()
}
fn add_this_peer_node_identity_to_db(&self) -> Result<(), StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let node_identity_indexes = node_identity::table.load::<NewThisPeerIdentitySql>(conn)?;
if node_identity_indexes.len() > 1 {
return Err(StorageError::UnexpectedResult(format!(
"There are multiple node identities for this peer in the database, expected 1, found {}",
node_identity_indexes.len()
)));
}
if !node_identity_indexes.is_empty() {
if self.this_peer_identity.public_key.to_hex() ==
node_identity_indexes.first().expect("already checked").public_key &&
self.this_peer_identity.node_id.to_hex() ==
node_identity_indexes.first().expect("already checked").node_id
{
return Ok(());
} else {
let old_row = node_identity_indexes.first().expect("already checked");
warn!(target: LOG_TARGET,
"Node ID mismatch detected: {} (from file) vs {} (from DB). \
This may occur if the base_node_id file was deleted without updating the database. \
Updating database identity to match the file.",
self.this_peer_identity.node_id.to_hex(),
old_row.node_id
);
let affected =
diesel::update(node_identity::table.filter(node_identity::node_id.eq(old_row.node_id.clone())))
.set((
node_identity::public_key.eq(self.this_peer_identity.public_key.to_hex()),
node_identity::node_id.eq(self.this_peer_identity.node_id.to_hex()),
node_identity::features.eq(self.this_peer_identity.features.to_i32()),
))
.execute(conn)?;
if affected != 1 {
return Err(StorageError::UnexpectedResult(format!(
"Expected to update 1 node_identity row, updated {}",
affected
)));
}
return Ok(());
}
}
let node_identity_sql = NewThisPeerIdentitySql {
public_key: self.this_peer_identity.public_key.to_hex(),
node_id: self.this_peer_identity.node_id.to_hex(),
features: self.this_peer_identity.features.to_i32(),
};
let inserted = diesel::insert_into(node_identity::table)
.values(node_identity_sql)
.execute(conn)?;
if inserted == 0 {
return Err(StorageError::UnexpectedResult(format!(
"Could not insert own node identity '{}'",
self.this_peer_identity.node_id
)));
}
Ok(())
})
}
pub fn add_or_update_peer(&self, peer: Peer) -> Result<PeerId, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let node_id = peer.node_id.clone();
match self.get_peer_by_node_id_inner(&node_id, conn)? {
Some(mut existing_peer) => {
existing_peer.merge(&peer);
let update_peer_sql = PeerDatabaseSql::update_peer_sql(existing_peer.clone())?;
self.update_peer_inner(update_peer_sql, conn)?;
Ok(existing_peer.id.unwrap_or_default())
},
None => {
trace!(target: LOG_TARGET, "Adding peer with node id '{node_id}'");
let new_peer_sql = self.add_peer_sql(peer)?;
let peer_id = self.add_peer_inner(new_peer_sql, conn)?;
Ok(peer_id)
},
}
})
}
pub fn add_or_update_online_peer(
&self,
pubkey: &CommsPublicKey,
node_id: &NodeId,
addresses: &[Multiaddr],
peer_features: &PeerFeatures,
source: &PeerAddressSource,
) -> Result<Peer, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
match self.get_peer_by_node_id_inner(node_id, conn)? {
Some(mut peer) => {
peer.addresses.add_or_update_addresses(addresses, source);
peer.features = *peer_features;
let update_peer_sql = PeerDatabaseSql::update_peer_sql(peer.clone())?;
self.update_peer_inner(update_peer_sql, conn)?;
Ok(peer)
},
None => {
let new_peer = Peer::new(
pubkey.clone(),
node_id.clone(),
MultiaddressesWithStats::from_addresses_with_source(addresses.to_vec(), source),
PeerFlags::default(),
*peer_features,
Default::default(),
Default::default(),
);
let new_peer_sql = self.add_peer_sql(new_peer.clone())?;
let peer_id = self.add_peer_inner(new_peer_sql, conn)?;
let mut peer = new_peer;
peer.set_id(peer_id);
Ok(peer)
},
}
})
}
fn add_peer_sql(&self, peer: Peer) -> Result<NewPeerWithAddressesSql, StorageError> {
let new_peer_sql = NewPeerSql {
peer_id: generate_peer_id_as_i64(),
public_key: peer.public_key.to_hex(),
node_id: peer.node_id.to_hex(),
flags: peer.flags.to_i32(),
banned_until: peer.banned_until,
banned_reason: peer.banned_reason.clone(),
features: peer.features.to_i32(),
supported_protocols: serialize_protocols(&peer.supported_protocols),
added_at: peer.added_at,
user_agent: peer.user_agent.clone(),
metadata: serialize_metadata(&peer.metadata)?,
deleted_at: peer.deleted_at,
};
let mut new_addresses_sql = Vec::with_capacity(peer.addresses.len());
for address in peer.addresses.iter() {
if !address.address().is_empty() {
new_addresses_sql.push(NewMultiaddrWithStatsSql {
address_id: None, peer_id: 0, address: address.address().to_string(),
is_external: address.is_external(),
last_seen: address.last_seen(),
connection_attempts: i32::try_from(address.connection_attempts())?,
avg_initial_dial_time: duration_to_i64_ms_infallible(address.avg_initial_dial_time()),
initial_dial_time_sample_count: i32::try_from(address.initial_dial_time_sample_count())?,
avg_latency: duration_to_i64_ms_infallible(address.avg_latency()),
latency_sample_count: i32::try_from(address.latency_sample_count())?,
last_attempted: address.last_attempted(),
last_failed_reason: address.last_failed_reason().map(|s| s.to_string()),
quality_score: address.quality_score(),
source: serde_json::to_string(&address.source())
.map_err(|err| StorageError::UnexpectedResult(err.to_string()))?,
});
}
}
let new_peer_sql = NewPeerWithAddressesSql {
peer: new_peer_sql,
addresses: new_addresses_sql,
};
Ok(new_peer_sql)
}
fn add_peer_inner(
&self,
new_peer_sql: NewPeerWithAddressesSql,
conn: &mut SqliteConnection,
) -> Result<PeerId, StorageError> {
let node_id = new_peer_sql.peer.node_id.clone();
let inserted = diesel::insert_into(peers::table)
.values(&new_peer_sql.peer)
.execute(conn)?;
if inserted == 0 {
return Err(StorageError::UnexpectedResult(format!(
"Could not insert peer '{node_id}'"
)));
}
let peer_id = peers::table
.filter(peers::node_id.eq(new_peer_sql.peer.node_id))
.select(peers::peer_id)
.first::<i64>(conn)?;
let addresses: Vec<_> = new_peer_sql
.addresses
.clone()
.iter_mut()
.map(|addr| {
addr.peer_id = peer_id;
addr.clone()
})
.collect();
let inserted = diesel::insert_into(multi_addresses::table)
.values(&addresses)
.execute(conn)?;
if inserted != addresses.len() {
return Err(StorageError::UnexpectedResult(format!(
"Could not insert address '{:?}' for peer '{}'",
new_peer_sql
.addresses
.iter()
.map(|v| v.address.clone())
.collect::<Vec<_>>(),
node_id
)));
}
Ok(peer_id_from_i64(peer_id))
}
fn update_peer_sql(peer: Peer) -> Result<UpdatePeerWithAddressesSql, StorageError> {
let update_peer_sql = UpdatePeerSql {
node_id: Some(peer.node_id.to_hex()),
flags: Some(peer.flags.to_i32()),
banned_until: Some(peer.banned_until),
banned_reason: Some(peer.banned_reason.clone()),
features: Some(peer.features.to_i32()),
supported_protocols: Some(serialize_protocols(&peer.supported_protocols)),
user_agent: Some(peer.user_agent.clone()),
metadata: Some(serialize_metadata(&peer.metadata)?),
deleted_at: Some(peer.deleted_at),
};
let mut update_addresses_sql = Vec::with_capacity(peer.addresses.len());
for address in peer.addresses.iter() {
if !address.address().is_empty() {
update_addresses_sql.push(UpdateMultiaddrWithStatsSql {
address: Some(address.address().to_string()),
is_external: Some(address.is_external()),
last_seen: Some(address.last_seen()),
connection_attempts: Some(i32::try_from(address.connection_attempts())?),
avg_initial_dial_time: Some(duration_to_i64_ms_infallible(address.avg_initial_dial_time())),
initial_dial_time_sample_count: Some(i32::try_from(address.initial_dial_time_sample_count())?),
avg_latency: Some(duration_to_i64_ms_infallible(address.avg_latency())),
latency_sample_count: Some(i32::try_from(address.latency_sample_count())?),
last_attempted: Some(address.last_attempted()),
last_failed_reason: Some(address.last_failed_reason().map(|s| s.to_string())),
quality_score: Some(address.quality_score()),
source: Some(
serde_json::to_string(&address.source())
.map_err(|err| StorageError::UnexpectedResult(err.to_string()))?,
),
});
}
}
let update_peer_sql = UpdatePeerWithAddressesSql {
peer: update_peer_sql,
addresses: update_addresses_sql,
};
Ok(update_peer_sql)
}
fn update_peer_inner(
&self,
update_peer_sql: UpdatePeerWithAddressesSql,
conn: &mut SqliteConnection,
) -> Result<(), StorageError> {
diesel::update(
peers::table.filter(peers::node_id.eq(update_peer_sql.peer.node_id.clone().unwrap_or_default())),
)
.set(&update_peer_sql.peer)
.execute(conn)?;
let peer_id = peers::table
.filter(peers::node_id.eq(update_peer_sql.peer.node_id.clone().unwrap_or_default()))
.select(peers::peer_id)
.first::<i64>(conn)?;
let final_addr_list: Vec<String> = update_peer_sql
.addresses
.iter()
.filter_map(|a| a.address.clone())
.collect();
diesel::delete(
multi_addresses::table
.filter(multi_addresses::peer_id.eq(peer_id))
.filter(not(multi_addresses::address.eq_any(&final_addr_list))),
)
.execute(conn)?;
for address_update in update_peer_sql.addresses {
let updated = diesel::update(
multi_addresses::table
.filter(multi_addresses::address.eq(address_update.address.clone().unwrap_or_default()))
.filter(multi_addresses::peer_id.eq(peer_id)),
)
.set(&address_update)
.execute(conn)?;
if updated == 0 {
let new_address_sql = NewMultiaddrWithStatsSql::from((address_update.clone(), peer_id));
diesel::insert_into(multi_addresses::table)
.values(&new_address_sql)
.execute(conn)?;
}
}
Ok(())
}
pub fn peer_exists_by_node_id(&self, node_id: &NodeId) -> Result<Option<PeerId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
if let Ok(peer_id) = peers::table
.filter(peers::node_id.eq(node_id.to_hex()))
.select(peers::peer_id)
.first::<i64>(&mut conn)
{
Ok(Some(peer_id_from_i64(peer_id)))
} else {
Ok(None)
}
}
pub fn peer_exists_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<PeerId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
if let Ok(peer_id) = peers::table
.filter(peers::public_key.eq(public_key.to_hex()))
.select(peers::peer_id)
.first::<i64>(&mut conn)
{
Ok(Some(peer_id_from_i64(peer_id)))
} else {
Ok(None)
}
}
pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(peers::table.filter(peers::node_id.eq(node_id.to_string())))
.set(peers::deleted_at.eq(chrono::Utc::now().naive_utc()))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
pub fn set_metadata(&self, node_id: &NodeId, key: u8, data: Vec<u8>) -> Result<Option<Vec<u8>>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let metadata = peers::table
.filter(peers::node_id.eq(node_id.to_string()))
.select(peers::metadata)
.first::<Option<Vec<u8>>>(conn)?;
let mut metadata_hashmap = deserialize_metadata(metadata)?;
let result = metadata_hashmap.insert(key, data);
let metadata = serialize_metadata(&metadata_hashmap)?;
diesel::update(peers::table.filter(peers::node_id.eq(node_id.to_string())))
.set(peers::metadata.eq(metadata))
.execute(conn)?;
Ok(result)
})
}
pub fn set_banned(
&self,
node_id: &NodeId,
ban_duration: Duration,
banned_reason: String,
) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let dt = safe_future_datetime_from_duration(ban_duration);
let banned_until = dt.naive_utc();
let affected = diesel::update(peers::table.filter(peers::node_id.eq(node_id.to_string())))
.set((
peers::banned_until.eq(banned_until),
peers::banned_reason.eq(banned_reason.clone()),
))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
pub fn reset_banned(&self, node_id: &NodeId) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(peers::table.filter(peers::node_id.eq(node_id.to_string())))
.set((
peers::banned_until.eq(None::<NaiveDateTime>),
peers::banned_reason.eq(String::new()),
))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
pub fn reset_all_banned(&self) -> Result<usize, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(peers::table.filter(peers::banned_until.is_not_null()))
.set((
peers::banned_until.eq(None::<NaiveDateTime>),
peers::banned_reason.eq(String::new()),
))
.execute(conn)?;
Ok(affected)
})
}
pub fn reset_offline_non_wallet_peers(&self) -> Result<usize, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(multi_addresses::table)
.filter(
multi_addresses::peer_id.eq_any(peers::table.select(peers::peer_id).filter(diesel::dsl::sql::<
diesel::sql_types::Bool,
>(
&format!("features & {} != 0", PeerFeatures::COMMUNICATION_NODE.to_i32()),
))),
)
.filter(multi_addresses::connection_attempts.ne(0))
.set((
multi_addresses::connection_attempts.eq(0),
multi_addresses::last_attempted.eq(None::<NaiveDateTime>),
multi_addresses::last_failed_reason.eq(None::<String>),
))
.execute(conn)?;
Ok(affected)
})
}
pub fn set_last_seen(
&self,
node_id: &NodeId,
last_seen: NaiveDateTime,
address: &Multiaddr,
) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(
multi_addresses::table
.filter(
multi_addresses::peer_id.nullable().eq(peers::table
.filter(peers::node_id.eq(node_id.to_string()))
.select(peers::peer_id)
.single_value()),
)
.filter(multi_addresses::address.eq(address.to_string())),
)
.set(multi_addresses::last_seen.eq(last_seen))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
pub fn reset_last_seen(&self, node_id: &NodeId, address: &Multiaddr) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(
multi_addresses::table
.filter(
multi_addresses::peer_id.nullable().eq(peers::table
.filter(peers::node_id.eq(node_id.to_string()))
.select(peers::peer_id)
.single_value()),
)
.filter(multi_addresses::address.eq(address.to_string())),
)
.set(multi_addresses::last_seen.eq(None::<NaiveDateTime>))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
pub fn set_last_failed_reason(
&self,
node_id: &NodeId,
last_failed_reason: String,
address: &Multiaddr,
) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(
multi_addresses::table
.filter(
multi_addresses::peer_id.nullable().eq(peers::table
.filter(peers::node_id.eq(node_id.to_string()))
.select(peers::peer_id)
.single_value()),
)
.filter(multi_addresses::address.eq(address.to_string())),
)
.set(multi_addresses::last_failed_reason.eq(sql_escape(&last_failed_reason)))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
pub fn reset_last_failed_reason(
&self,
node_id: &NodeId,
address: &Multiaddr,
) -> Result<Option<NodeId>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.immediate_transaction::<_, StorageError, _>(|conn| {
let affected = diesel::update(
multi_addresses::table
.filter(
multi_addresses::peer_id.nullable().eq(peers::table
.filter(peers::node_id.eq(node_id.to_string()))
.select(peers::peer_id)
.single_value()),
)
.filter(multi_addresses::address.eq(address.to_string())),
)
.set(multi_addresses::last_failed_reason.eq(None::<String>))
.execute(conn)?;
if affected > 0 {
Ok(Some(node_id.clone()))
} else {
Ok(None)
}
})
}
fn peers_from_left_join_query(
results: Vec<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>,
) -> Result<Vec<Peer>, StorageError> {
use std::collections::HashMap;
if results.is_empty() {
return Ok(Vec::new());
}
let mut peer_map: HashMap<i64, (NewPeerSql, Vec<NewMultiaddrWithStatsSql>)> =
HashMap::with_capacity(results.len());
for (peer, addr_opt) in results {
let entry = peer_map.entry(peer.peer_id).or_insert_with(|| (peer, Vec::new()));
if let Some(addr) = addr_opt {
entry.1.push(addr);
}
}
let peers = peer_map
.into_iter()
.map(|(_, (peer, addresses))| Peer::try_from((peer, addresses)))
.collect::<Result<Vec<_>, _>>()?;
Ok(peers)
}
fn peers_from_join_query(results: Vec<(NewPeerSql, NewMultiaddrWithStatsSql)>) -> Result<Vec<Peer>, StorageError> {
if results.is_empty() {
return Ok(Vec::new());
}
let mut peer_map: HashMap<i64, (NewPeerSql, Vec<NewMultiaddrWithStatsSql>)> =
HashMap::with_capacity(results.len());
for (peer, address) in results {
let peer_id = peer.peer_id;
peer_map
.entry(peer_id)
.or_insert_with(|| (peer, Vec::new()))
.1
.push(address);
}
let peers = peer_map
.into_iter()
.map(|(_, (peer, addresses))| Peer::try_from((peer, addresses)))
.collect::<Result<Vec<_>, _>>()?;
Ok(peers)
}
pub fn find_all_peers_match_partial_key(&self, start_bytes: &[u8]) -> Result<Vec<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
if start_bytes.is_empty() {
return Ok(Vec::new());
}
let partial_key = hex::to_hex(start_bytes);
if start_bytes.len() > CommsPublicKey::key_length() {
return Err(StorageError::MessageFormatError(format!(
"Invalid length ({}) for peer NodeId or PublicKey, must be less than or equal to {}",
start_bytes.len(),
CommsPublicKey::key_length(),
)));
}
let mut results;
if start_bytes.len() > NodeId::byte_size() {
results = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::public_key.like(format!("{partial_key}%")))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(&mut conn)?;
} else {
results = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::node_id.like(format!("{partial_key}%")))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(&mut conn)?;
let mut public_key_match = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::public_key.like(format!("{partial_key}%")))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(&mut conn)?;
results.append(&mut public_key_match);
}
PeerDatabaseSql::peers_from_left_join_query(results)
}
pub fn get_all_peers(&self, features: Option<PeerFeatures>) -> Result<Vec<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.transaction::<_, StorageError, _>(|conn| self.get_all_peers_inner(features, conn))
}
fn get_all_peers_inner(
&self,
features: Option<PeerFeatures>,
conn: &mut SqliteConnection,
) -> Result<Vec<Peer>, StorageError> {
let mut query = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.into_boxed();
if let Some(features) = features {
if features == PeerFeatures::COMMUNICATION_CLIENT {
query = query.filter(peers::features.eq(features.to_i32()));
} else {
query = query.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"features & {} != 0",
features.to_i32()
)));
}
}
let results = query.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(conn)?;
PeerDatabaseSql::peers_from_left_join_query(results)
}
pub fn get_n_not_banned_or_deleted_peers(&self, number: usize) -> Result<Vec<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let results = peers::table
.inner_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(
peers::banned_until
.is_null()
.or(peers::banned_until.lt(chrono::Utc::now().naive_utc())),
)
.filter(peers::deleted_at.is_null())
.load::<(NewPeerSql, NewMultiaddrWithStatsSql)>(&mut conn)?;
let peers = PeerDatabaseSql::peers_from_join_query(results)?;
Ok(peers.into_iter().take(number).collect())
}
pub fn get_available_dial_candidates(
&self,
exclude_node_ids: &[NodeId],
n: Option<usize>,
transport_protocols: &[TransportProtocol],
exclude_failed: bool,
randomize: bool,
) -> Result<Vec<Peer>, StorageError> {
if let Some(n) = n &&
n == 0
{
warn!(target: LOG_TARGET, "'0' requested for 'get_available_dial_candidates'");
return Ok(Vec::new());
}
let mut conn = self.connection.get_pooled_connection()?;
let addr_filter_sql = Self::build_addr_filter_sql(transport_protocols);
let mut query = peers::table
.inner_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(
peers::banned_until
.is_null()
.or(peers::banned_until.lt(chrono::Utc::now().naive_utc())),
)
.filter(peers::deleted_at.is_null())
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"features & {} != 0",
PeerFeatures::COMMUNICATION_NODE.to_i32()
)))
.select(peers::node_id)
.distinct()
.into_boxed();
if exclude_failed {
query = query.filter(multi_addresses::last_failed_reason.is_null());
}
if randomize {
query = query.order_by(diesel::dsl::sql::<diesel::sql_types::Integer>("RANDOM()"));
}
if let Some(filter_sql) = addr_filter_sql {
query = query.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&filter_sql));
}
if !exclude_node_ids.is_empty() {
let excluded_hex_ids: Vec<String> = exclude_node_ids.iter().map(|node_id| node_id.to_hex()).collect();
query = query.filter(peers::node_id.ne_all(excluded_hex_ids));
}
if let Some(limit) = n {
let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
query = query.limit(limit_i64);
}
let node_ids = query.load::<String>(&mut conn)?;
if node_ids.is_empty() {
return Ok(Vec::new());
}
self.get_peers_by_node_ids_str(&node_ids, false, &mut conn)
}
pub fn get_peer_by_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
self.get_peer_by_node_id_inner(node_id, &mut conn)
}
fn get_peer_by_node_id_inner(
&self,
node_id: &NodeId,
conn: &mut SqliteConnection,
) -> Result<Option<Peer>, StorageError> {
let node_id = node_id.to_hex();
let results = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::node_id.eq(node_id))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(conn)?;
Ok(PeerDatabaseSql::peers_from_left_join_query(results)?.first().cloned())
}
pub fn get_peers_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let node_ids_hex = node_ids.iter().map(|id| id.to_hex()).collect::<Vec<_>>();
self.get_peers_by_node_ids_str(&node_ids_hex, false, &mut conn)
}
pub fn get_peer_public_keys_by_node_ids(&self, node_ids: &[NodeId]) -> Result<Vec<CommsPublicKey>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let node_ids = node_ids.iter().map(|id| id.to_hex()).collect::<Vec<_>>();
let public_keys = peers::table
.filter(peers::node_id.eq_any(node_ids))
.select(peers::public_key)
.load::<String>(&mut conn)?;
let public_keys = public_keys
.iter()
.map(|p| CommsPublicKey::from_hex(p))
.collect::<Result<Vec<_>, _>>()?;
Ok(public_keys)
}
fn get_peers_by_node_ids_str(
&self,
node_ids: &[String],
external_addresses_only: bool,
conn: &mut PooledConnection<ConnectionManager<SqliteConnection>>,
) -> Result<Vec<Peer>, StorageError> {
if external_addresses_only {
let results = peers::table
.left_outer_join(
multi_addresses::table.on(multi_addresses::peer_id
.eq(peers::peer_id)
.and(multi_addresses::is_external.eq(true))),
)
.filter(peers::node_id.eq_any(node_ids))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(conn)?;
PeerDatabaseSql::peers_from_left_join_query(results)
} else {
let results = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::node_id.eq_any(node_ids))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(conn)?;
PeerDatabaseSql::peers_from_left_join_query(results)
}
}
pub fn get_banned_peers(&self) -> Result<Vec<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let results = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(
peers::banned_until
.is_not_null()
.and(peers::banned_until.gt(chrono::Utc::now().naive_utc())),
)
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(&mut conn)?;
PeerDatabaseSql::peers_from_left_join_query(results)
}
pub fn get_peer_by_public_key(&self, public_key: &CommsPublicKey) -> Result<Option<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let public_key = public_key.to_hex();
let results = peers::table
.left_outer_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::public_key.eq(public_key))
.select((peers::all_columns, multi_addresses::all_columns.nullable()))
.load::<(NewPeerSql, Option<NewMultiaddrWithStatsSql>)>(&mut conn)?;
if results.is_empty() {
return Ok(None);
}
let peer_query = results.first().expect("already checked").0.clone();
let addresses_query = results
.iter()
.filter_map(|(_, address)| address.clone())
.collect::<Vec<_>>();
Ok(Some(Peer::try_from((peer_query, addresses_query))?))
}
pub fn get_addresses(&self, node_id: &NodeId) -> Result<MultiaddressesWithStats, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
conn.transaction::<_, StorageError, _>(|conn| {
let node_id = node_id.to_hex();
let peer_id = peers::table
.filter(peers::node_id.eq(node_id))
.select(peers::peer_id)
.first::<i64>(conn)?;
let addresses_query: Vec<NewMultiaddrWithStatsSql> = multi_addresses::table
.filter(multi_addresses::peer_id.eq(peer_id))
.load::<NewMultiaddrWithStatsSql>(conn)?;
MultiaddressesWithStats::try_from(addresses_query)
})
}
fn get_active_peer_node_ids(
&self,
excluded_peers: &[NodeId],
features: Option<PeerFeatures>,
peer_flags: Option<PeerFlags>,
stale_peer_threshold: Option<Duration>,
exclude_if_all_address_failed: bool,
at_least_one_external_addresses: bool,
n: Option<usize>,
conn: &mut PooledConnection<ConnectionManager<SqliteConnection>>,
transport_protocols: &[TransportProtocol],
) -> Result<Vec<String>, StorageError> {
if let Some(n) = n &&
n == 0
{
warn!(target: LOG_TARGET, "'0' requested for 'get_active_peer_node_ids'");
return Ok(Vec::new());
}
let excluded_node_ids_hex = excluded_peers.iter().map(|id| id.to_hex()).collect::<Vec<_>>();
let addr_filter_sql = Self::build_addr_filter_sql(transport_protocols);
let mut query = peers::table
.inner_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(
peers::banned_until
.is_null()
.or(peers::banned_until.lt(chrono::Utc::now().naive_utc())),
)
.filter(peers::deleted_at.is_null())
.filter(peers::node_id.ne_all(excluded_node_ids_hex))
.distinct()
.into_boxed();
if let Some(flags) = peer_flags {
query = query.filter(peers::flags.eq(flags.to_i32()));
}
if let Some(filter_sql) = addr_filter_sql {
query = query.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&filter_sql));
}
if exclude_if_all_address_failed {
query = query
.filter(multi_addresses::last_seen.is_not_null())
.filter(multi_addresses::last_failed_reason.is_null());
}
if at_least_one_external_addresses {
query = query.filter(multi_addresses::is_external.eq(true));
}
if let Some(threshold) = stale_peer_threshold {
let threshold = min(threshold, Duration::from_secs(i64::MAX.unsigned_abs() - 1));
let stale_threshold =
chrono::Utc::now().naive_utc() - chrono::Duration::from_std(threshold).unwrap_or(TimeDelta::MAX);
query = query.filter(
multi_addresses::last_seen
.is_null()
.or(multi_addresses::last_seen.ge(stale_threshold)),
);
}
if let Some(features) = features {
if features == PeerFeatures::COMMUNICATION_CLIENT {
query = query.filter(peers::features.eq(features.to_i32()));
} else {
query = query.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"features & {} != 0",
features.to_i32()
)));
}
}
if let Some(n) = n {
query = query
.order_by(diesel::dsl::sql::<diesel::sql_types::Integer>("RANDOM()"))
.limit(i64::try_from(n).unwrap_or(i64::MAX));
}
let node_ids_hex = query.select(peers::node_id).load::<String>(conn)?;
Ok(node_ids_hex)
}
pub fn get_n_random_active_peers(
&self,
n: usize,
excluded_peers: &[NodeId],
features: Option<PeerFeatures>,
peer_flags: Option<PeerFlags>,
stale_peer_threshold: Option<Duration>,
external_addresses_only: bool,
transport_protocols: &[TransportProtocol],
) -> Result<Vec<Peer>, StorageError> {
if n == 0 {
warn!(target: LOG_TARGET, "'0' requested for 'get_n_random_active_peers'");
return Ok(Vec::new());
}
let mut conn = self.connection.get_pooled_connection()?;
conn.transaction::<_, StorageError, _>(|conn| {
let node_ids_hex = self.get_active_peer_node_ids(
excluded_peers,
features,
peer_flags,
stale_peer_threshold,
true,
external_addresses_only,
Some(n),
conn,
transport_protocols,
)?;
let peers = self.get_peers_by_node_ids_str(&node_ids_hex, external_addresses_only, conn)?;
Ok(peers)
})
}
pub fn get_n_random_peers(
&self,
n: usize,
exclude_node_ids: &[NodeId],
peer_flags: Option<PeerFlags>,
transport_protocols: &[TransportProtocol],
known_good: bool,
) -> Result<Vec<Peer>, StorageError> {
if n == 0 {
warn!(target: LOG_TARGET, "'0' requested for 'get_n_random_peers'");
return Ok(Vec::new());
}
let mut conn = self.connection.get_pooled_connection()?;
let exclude_node_ids = exclude_node_ids.iter().map(|id| id.to_hex()).collect::<Vec<_>>();
let addr_filter_sql = Self::build_addr_filter_sql(transport_protocols);
conn.transaction::<_, StorageError, _>(|conn| {
let mut query = peers::table
.inner_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(peers::deleted_at.is_null())
.filter(
peers::banned_until
.is_null()
.or(peers::banned_until.lt(chrono::Utc::now().naive_utc())),
)
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"features & {} != 0",
PeerFeatures::COMMUNICATION_NODE.to_i32()
)))
.filter(peers::node_id.ne_all(exclude_node_ids))
.order_by(diesel::dsl::sql::<diesel::sql_types::Integer>("RANDOM()"))
.limit(i64::try_from(n).unwrap_or(i64::MAX))
.select(peers::node_id)
.distinct()
.into_boxed();
if known_good {
query = query.filter(multi_addresses::last_seen.is_not_null());
}
if let Some(flags) = peer_flags {
query = query.filter(peers::flags.eq(flags.to_i32()));
}
if let Some(filter_sql) = addr_filter_sql {
query = query.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&filter_sql));
}
let node_ids: Vec<String> = query.load::<String>(conn)?;
if node_ids.is_empty() {
return Ok(Vec::new());
}
self.get_peers_by_node_ids_str(&node_ids, true, conn)
})
}
pub fn get_seed_peers(&self) -> Result<Vec<Peer>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let results = peers::table
.inner_join(multi_addresses::table.on(multi_addresses::peer_id.eq(peers::peer_id)))
.filter(diesel::dsl::sql::<diesel::sql_types::Bool>(&format!(
"flags & {} != 0",
PeerFlags::SEED.to_i32()
)))
.load::<(NewPeerSql, NewMultiaddrWithStatsSql)>(&mut conn)?;
PeerDatabaseSql::peers_from_join_query(results)
}
fn get_peer_indexes(&self) -> Result<Vec<(i64, String, String)>, StorageError> {
let mut conn = self.connection.get_pooled_connection()?;
let peer_indexes = peers::table
.select((peers::peer_id, peers::public_key, peers::node_id))
.load::<(i64, String, String)>(&mut conn)?;
Ok(peer_indexes.into_iter().collect::<Vec<_>>())
}
pub fn size(&self) -> usize {
self.get_peer_indexes().unwrap_or_default().len()
}
fn build_addr_filter_sql(transport_protocols: &[TransportProtocol]) -> Option<String> {
if transport_protocols.is_empty() {
return None;
}
let conditions: Vec<String> = transport_protocols
.iter()
.map(|protocol| format!("multi_addresses.address LIKE '{}%'", sql_escape(protocol.get_prefix())))
.collect();
Some(format!("({})", conditions.join(" OR ")))
}
}
fn sql_escape(input: &str) -> String {
input.replace('\'', "''")
}
#[derive(Clone, Debug, Selectable, Queryable, Insertable, PartialEq, Eq)]
#[diesel(table_name = node_identity)]
pub struct NewThisPeerIdentitySql {
pub public_key: String,
pub node_id: String,
pub features: i32,
}
#[derive(Clone, Debug)]
pub struct NewPeerWithAddressesSql {
pub peer: NewPeerSql,
pub addresses: Vec<NewMultiaddrWithStatsSql>,
}
#[derive(Clone, Debug, Queryable, Insertable, PartialEq, Eq)]
#[diesel(table_name = peers)]
pub struct NewPeerSql {
pub peer_id: i64,
pub public_key: String,
pub node_id: String,
pub flags: i32,
pub banned_until: Option<NaiveDateTime>,
pub banned_reason: String,
pub features: i32,
pub supported_protocols: String,
pub added_at: NaiveDateTime,
pub user_agent: String,
pub metadata: Option<Vec<u8>>,
pub deleted_at: Option<NaiveDateTime>,
}
#[derive(Clone, Debug, AsChangeset, PartialEq, Eq)]
#[diesel(table_name = peers)]
pub struct UpdatePeerSql {
pub node_id: Option<String>,
pub flags: Option<i32>,
pub banned_until: Option<Option<NaiveDateTime>>,
pub banned_reason: Option<String>,
pub features: Option<i32>,
pub supported_protocols: Option<String>,
pub user_agent: Option<String>,
pub metadata: Option<Option<Vec<u8>>>,
pub deleted_at: Option<Option<NaiveDateTime>>,
}
#[derive(Clone, Debug)]
pub struct UpdatePeerWithAddressesSql {
pub peer: UpdatePeerSql,
pub addresses: Vec<UpdateMultiaddrWithStatsSql>,
}
#[derive(Clone, Debug, Queryable, Insertable, PartialEq, Eq)]
#[diesel(table_name = multi_addresses)]
pub struct NewMultiaddrWithStatsSql {
pub address_id: Option<i32>,
pub peer_id: i64,
pub address: String,
pub is_external: bool,
pub last_seen: Option<NaiveDateTime>,
pub connection_attempts: i32,
pub avg_initial_dial_time: Option<i64>,
pub initial_dial_time_sample_count: i32,
pub avg_latency: Option<i64>,
pub latency_sample_count: i32,
pub last_attempted: Option<NaiveDateTime>,
pub last_failed_reason: Option<String>,
pub quality_score: Option<i32>,
pub source: String,
}
#[derive(Clone, Debug, AsChangeset, PartialEq, Eq)]
#[diesel(table_name = multi_addresses)]
pub struct UpdateMultiaddrWithStatsSql {
pub address: Option<String>,
pub is_external: Option<bool>,
pub last_seen: Option<Option<NaiveDateTime>>,
pub connection_attempts: Option<i32>,
pub avg_initial_dial_time: Option<Option<i64>>,
pub initial_dial_time_sample_count: Option<i32>,
pub avg_latency: Option<Option<i64>>,
pub latency_sample_count: Option<i32>,
pub last_attempted: Option<Option<NaiveDateTime>>,
pub last_failed_reason: Option<Option<String>>,
pub quality_score: Option<Option<i32>>,
pub source: Option<String>,
}
pub fn serialize_protocols(protocols: &[ProtocolId]) -> String {
protocols
.iter()
.map(|p| String::from_utf8_lossy(p).to_string())
.collect::<Vec<_>>()
.join(",")
}
pub fn deserialize_protocols(data: &str) -> Vec<ProtocolId> {
if data.is_empty() {
Vec::new()
} else {
data.split(',').map(|s| Bytes::from(s.to_string())).collect()
}
}
pub fn serialize_metadata(metadata: &HashMap<u8, Vec<u8>>) -> Result<Option<Vec<u8>>, StorageError> {
if metadata.is_empty() {
Ok(None)
} else {
Ok(Some(serde_json::to_vec(metadata)?))
}
}
pub fn deserialize_metadata(data: Option<Vec<u8>>) -> Result<HashMap<u8, Vec<u8>>, StorageError> {
match data {
Some(d) if !d.is_empty() => serde_json::from_slice(&d).map_err(StorageError::JsonError),
_ => Ok(HashMap::new()),
}
}
impl From<MultiaddrWithStats> for UpdateMultiaddrWithStatsSql {
fn from(address: MultiaddrWithStats) -> Self {
UpdateMultiaddrWithStatsSql::from(&address)
}
}
fn duration_to_i64_ms_infallible(duration: Option<Duration>) -> Option<i64> {
match duration.map(|v| v.as_millis()) {
Some(ms_u128) => match ms_u128.try_into() {
Ok(ms_i64) => Some(ms_i64),
Err(e) => {
warn!(target: LOG_TARGET, "duration_to_i64_ms_infallible {duration:?} conversion error: {e}");
Some(i64::MAX)
},
},
_ => None,
}
}
fn u32_to_i32_infallible(value: u32) -> i32 {
i32::try_from(value).unwrap_or({
warn!(target: LOG_TARGET, "u32_to_i32_infallible conversion error");
i32::MAX
})
}
impl From<&MultiaddrWithStats> for UpdateMultiaddrWithStatsSql {
fn from(address: &MultiaddrWithStats) -> Self {
UpdateMultiaddrWithStatsSql {
address: Some(address.to_string()),
is_external: Some(address.is_external()),
last_seen: Some(address.last_seen()),
connection_attempts: Some(u32_to_i32_infallible(address.connection_attempts())),
avg_initial_dial_time: Some(duration_to_i64_ms_infallible(address.avg_initial_dial_time())),
initial_dial_time_sample_count: Some(u32_to_i32_infallible(address.initial_dial_time_sample_count())),
avg_latency: Some(duration_to_i64_ms_infallible(address.avg_latency())),
latency_sample_count: Some(u32_to_i32_infallible(address.latency_sample_count())),
last_attempted: Some(address.last_attempted()),
last_failed_reason: Some(address.last_failed_reason().map(|v| v.to_string())),
quality_score: Some(address.quality_score()),
source: Some(serde_json::to_string(&address.source()).unwrap_or_default()),
}
}
}
impl TryFrom<(NewPeerSql, Vec<NewMultiaddrWithStatsSql>)> for Peer {
type Error = StorageError;
fn try_from(
(peer_query, addresses_query): (NewPeerSql, Vec<NewMultiaddrWithStatsSql>),
) -> Result<Self, Self::Error> {
Ok(Peer::new_with_stats(
Some(
u64::try_from(peer_query.peer_id)
.expect("infallible - auto generated from 'generate_peer_id_as_i64()'"),
)
.filter(|&id| id != 0),
CommsPublicKey::from_hex(&peer_query.public_key)?,
NodeId::from_hex(&peer_query.node_id)?,
MultiaddressesWithStats::try_from(addresses_query)?,
PeerFlags::from_bits(u8::try_from(peer_query.flags)?)
.ok_or_else(|| StorageError::UnexpectedResult("Peer flags are invalid".to_string()))?,
peer_query.banned_until,
peer_query.banned_reason,
PeerFeatures::from_bits(u32::try_from(peer_query.features)?)
.ok_or_else(|| StorageError::UnexpectedResult("Peer features are invalid".to_string()))?,
deserialize_protocols(&peer_query.supported_protocols),
peer_query.added_at,
peer_query.user_agent,
deserialize_metadata(peer_query.metadata)?,
peer_query.deleted_at,
))
}
}
fn i64_to_duration(val: Option<i64>) -> Result<Option<Duration>, StorageError> {
val.map(|t| {
u64::try_from(t)
.map(Duration::from_millis)
.map_err(|_| StorageError::UnexpectedResult("Invalid duration".to_string()))
})
.transpose()
}
impl TryFrom<Vec<NewMultiaddrWithStatsSql>> for MultiaddressesWithStats {
type Error = StorageError;
fn try_from(addresses_query: Vec<NewMultiaddrWithStatsSql>) -> Result<Self, Self::Error> {
let mut addresses = Vec::new();
for addr in addresses_query {
let address = if addr.address.is_empty() {
MultiaddrWithStats::new(Multiaddr::empty(), PeerAddressSource::Config)
} else {
MultiaddrWithStats::new_with_stats(
Multiaddr::from_str(&addr.address).map_err(|e| StorageError::UnexpectedResult(e.to_string()))?,
addr.last_seen,
u32::try_from(addr.connection_attempts)?,
i64_to_duration(addr.avg_initial_dial_time)?,
u32::try_from(addr.initial_dial_time_sample_count)?,
i64_to_duration(addr.avg_latency)?,
u32::try_from(addr.latency_sample_count)?,
addr.last_attempted,
addr.last_failed_reason,
addr.quality_score,
serde_json::from_str(&addr.source).map_err(StorageError::JsonError)?,
)
};
addresses.push(address);
}
if addresses.is_empty() {
addresses.push(MultiaddrWithStats::new(Multiaddr::empty(), PeerAddressSource::Config));
}
Ok(MultiaddressesWithStats::from(addresses))
}
}
impl From<(UpdateMultiaddrWithStatsSql, i64)> for NewMultiaddrWithStatsSql {
fn from((address, peer_id): (UpdateMultiaddrWithStatsSql, i64)) -> Self {
NewMultiaddrWithStatsSql {
address_id: None,
peer_id,
address: address.address.unwrap_or_default(),
is_external: address.is_external.unwrap_or_default(),
last_seen: address.last_seen.unwrap_or_default(),
connection_attempts: address.connection_attempts.unwrap_or_default(),
avg_initial_dial_time: address.avg_initial_dial_time.unwrap_or_default(),
initial_dial_time_sample_count: address.initial_dial_time_sample_count.unwrap_or_default(),
avg_latency: address.avg_latency.unwrap_or_default(),
latency_sample_count: address.latency_sample_count.unwrap_or_default(),
last_attempted: address.last_attempted.unwrap_or_default(),
last_failed_reason: address.last_failed_reason.unwrap_or_default(),
quality_score: address.quality_score.unwrap_or_default(),
source: address.source.unwrap_or_default(),
}
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::indexing_slicing)]
use std::time::Duration;
use chrono::TimeDelta;
use diesel::{self, ExpressionMethods, QueryDsl, RunQueryDsl};
use tari_common_sqlite::connection::DbConnection;
use tari_utilities::{ByteArray, hex::Hex};
use crate::{
multiaddr::Multiaddr,
net_address::{MultiaddressesWithStats, PeerAddressSource},
peer_manager::{
NodeId,
Peer,
PeerFeatures,
PeerFlags,
create_test_peer,
create_test_peer_add_internal_addresses,
create_test_peer_internal_addresses_only,
database::{MIGRATIONS, NewMultiaddrWithStatsSql, NewPeerSql, PeerDatabaseSql},
manager::create_test_peer_with_onion_address,
storage::{
database::{duration_to_i64_ms_infallible, u32_to_i32_infallible},
schema::{multi_addresses, peers},
},
},
protocol::ProtocolId,
types::{CommsPublicKey, TransportProtocol},
};
#[test]
fn test_add_update_peer_with_addresses() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let mut new_peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
let mut new_peer_sql = peers_db.add_peer_sql(new_peer.clone()).unwrap();
peers_db.add_or_update_peer(new_peer.clone()).unwrap();
let mut conn = peers_db.connection.get_pooled_connection().unwrap();
let count: i64 = peers::table.count().get_result(&mut conn).unwrap();
assert_eq!(count, 1);
let count: i64 = multi_addresses::table.count().get_result(&mut conn).unwrap();
assert_eq!(count, i64::try_from(new_peer.addresses.len()).unwrap());
let peer_query: NewPeerSql = peers::table
.filter(peers::node_id.eq(new_peer.node_id.to_hex()))
.first::<NewPeerSql>(&mut conn)
.unwrap();
let peer_id: i64 = peers::table
.filter(peers::node_id.eq(new_peer_sql.peer.node_id.clone()))
.select(peers::peer_id)
.first::<i64>(&mut conn)
.unwrap();
new_peer_sql.peer.peer_id = peer_id;
assert_eq!(peer_query, new_peer_sql.peer);
let addresses_query: Vec<NewMultiaddrWithStatsSql> = multi_addresses::table
.filter(multi_addresses::peer_id.eq(peer_query.peer_id))
.load::<NewMultiaddrWithStatsSql>(&mut conn)
.unwrap();
for (address_query, mut address) in addresses_query.iter().zip(new_peer_sql.addresses) {
address.address_id = address_query.address_id;
address.peer_id = peer_id;
assert_eq!(address_query, &address);
}
let peer_from_query = Peer::try_from((peer_query, addresses_query)).unwrap();
assert_eq!(peer_from_query, new_peer);
let peer_from_db = peers_db.get_peer_by_node_id(&new_peer.node_id).unwrap().unwrap();
assert_eq!(peer_from_db, new_peer);
new_peer.ban_for(Duration::from_secs(12345), "Misbehave".to_string());
new_peer
.supported_protocols
.push(ProtocolId::from_static(b"Test Protocol 1.0"));
new_peer.metadata.insert(1, vec![1, 2, 3]);
new_peer.metadata.insert(2, vec![4, 5, 6]);
let new_addr: Multiaddr = "/ip4/127.0.0.1/udt/sctp/5678".parse().unwrap();
new_peer
.addresses
.add_or_update_addresses(std::slice::from_ref(&new_addr), &PeerAddressSource::Config);
let mut address_to_update = new_peer.addresses.addresses().first().unwrap().clone();
address_to_update.update_latency(Duration::from_millis(123));
address_to_update.update_initial_dial_time(Duration::from_millis(1234));
address_to_update.mark_last_seen_now();
new_peer
.addresses
.merge(&MultiaddressesWithStats::new(vec![address_to_update.clone()]));
peers_db.add_or_update_peer(new_peer.clone()).unwrap();
let peer_from_db = peers_db.get_peer_by_node_id(&new_peer.node_id).unwrap().unwrap();
assert_eq!(peer_from_db, new_peer);
assert_eq!(peer_from_db.addresses, new_peer.addresses);
let addresses_from_db = peers_db.get_addresses(&new_peer.node_id).unwrap();
assert_eq!(addresses_from_db, new_peer.addresses);
}
#[test]
fn test_filtering_peers_by_transport_protocols() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let transport_protocols = vec![TransportProtocol::Ipv4, TransportProtocol::Ipv6];
for _i in 0..20 {
let peer = create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_NODE);
peers_db.add_or_update_peer(peer).unwrap();
let peer = create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_CLIENT);
peers_db.add_or_update_peer(peer).unwrap();
}
assert_eq!(peers_db.size(), 40);
let nodes_with_all_addresses = peers_db
.get_n_random_active_peers(100, &[], None, None, None, false, &transport_protocols)
.unwrap();
assert_eq!(nodes_with_all_addresses.len(), 40);
let nodes_with_onion_addresses = peers_db
.get_n_random_active_peers(100, &[], None, None, None, false, &[TransportProtocol::Onion])
.unwrap();
assert!(nodes_with_onion_addresses.is_empty());
assert!(
nodes_with_all_addresses
.iter()
.all(|p| { p.addresses.addresses().iter().any(|addr| addr.is_external()) })
);
assert!(
nodes_with_all_addresses
.iter()
.all(|p| { p.addresses.addresses().iter().any(|addr| !addr.is_external()) })
);
let onion_peer = create_test_peer_with_onion_address(false, PeerFeatures::COMMUNICATION_NODE);
peers_db.add_or_update_peer(onion_peer).unwrap();
let node_with_onion_addresses = peers_db
.get_n_random_active_peers(100, &[], None, None, None, false, &[TransportProtocol::Onion])
.unwrap();
assert_eq!(node_with_onion_addresses.len(), 1);
}
#[ignore]
#[test]
fn test_batch_add_update_peers_with_addresses() {
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_seed_peer_exclusion() {
let transport_protocols = TransportProtocol::get_all();
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let mut node_peers = Vec::with_capacity(12);
for i in 0..12 {
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
if i % 3 == 0 {
peer.flags = PeerFlags::SEED;
}
node_peers.push(peer.clone());
peers_db.add_or_update_peer(peer).unwrap();
}
let random_peers = peers_db
.get_n_random_peers(12, &[], None, &transport_protocols, false)
.unwrap();
assert_eq!(random_peers.len(), 12);
let random_peers = peers_db
.get_n_random_peers(12, &[], Some(PeerFlags::SEED), &transport_protocols, false)
.unwrap();
assert_eq!(random_peers.len(), 4);
assert!(random_peers.iter().all(|p| p.flags == PeerFlags::SEED));
let random_peers = peers_db
.get_n_random_peers(1, &[], Some(PeerFlags::SEED), &transport_protocols, false)
.unwrap();
assert_eq!(random_peers[0].flags, PeerFlags::SEED);
let random_peers = peers_db
.get_n_random_peers(12, &[], Some(PeerFlags::NONE), &transport_protocols, false)
.unwrap();
assert_eq!(random_peers.len(), 8);
assert!(random_peers.iter().all(|p| p.flags == PeerFlags::NONE));
let random_peers = peers_db
.get_n_random_peers(1, &[], Some(PeerFlags::NONE), &transport_protocols, false)
.unwrap();
assert_eq!(random_peers[0].flags, PeerFlags::NONE);
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_various_queries() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let transport_protocols = TransportProtocol::get_all();
let mut node_peers = Vec::with_capacity(12);
for i in 0..12 {
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
if i % 4 == 0 {
peer.flags = PeerFlags::SEED;
}
node_peers.push(peer.clone());
peers_db.add_or_update_peer(peer).unwrap();
}
let mut wallet_peers = Vec::with_capacity(12);
for _i in 0..12 {
let peer = create_test_peer(false, PeerFeatures::COMMUNICATION_CLIENT);
wallet_peers.push(peer.clone());
peers_db.add_or_update_peer(peer).unwrap();
}
assert_eq!(peers_db.size(), 24);
for i in 1..NodeId::byte_size() {
let matches = peers_db
.find_all_peers_match_partial_key(&node_peers[0].node_id.as_bytes()[0..i])
.unwrap();
assert!(matches.contains(&node_peers[0]));
}
for i in 1..CommsPublicKey::key_length() {
let matches = peers_db
.find_all_peers_match_partial_key(&node_peers[0].public_key.as_bytes()[0..i])
.unwrap();
assert!(matches.contains(&node_peers[0]));
}
peers_db.soft_delete_peer(&node_peers[1].node_id).unwrap();
let deleted_peer = peers_db.get_peer_by_node_id(&node_peers[1].node_id).unwrap().unwrap();
assert!(deleted_peer.deleted_at.is_some());
peers_db.soft_delete_peer(&wallet_peers[1].node_id).unwrap();
let deleted_peer = peers_db.get_peer_by_node_id(&wallet_peers[1].node_id).unwrap().unwrap();
assert!(deleted_peer.deleted_at.is_some());
assert!(
peers_db
.peer_exists_by_node_id(&node_peers[2].node_id)
.unwrap()
.is_some()
);
let peer = peers_db
.get_peer_by_public_key(&node_peers[3].public_key)
.unwrap()
.unwrap();
assert_eq!(peer, node_peers[3]);
assert!(
peers_db
.peer_exists_by_public_key(&node_peers[4].public_key)
.unwrap()
.is_some()
);
let all_peers = peers_db.get_all_peers(None).unwrap();
assert_eq!(all_peers.len(), 24);
peers_db
.set_banned(
&node_peers[4].node_id,
Duration::from_secs(12345),
"Misbehaviour is punished".to_string(),
)
.unwrap();
peers_db
.set_banned(
&wallet_peers[4].node_id,
Duration::from_secs(12345),
"Misbehaviour is punished".to_string(),
)
.unwrap();
let n_peers = peers_db.get_n_not_banned_or_deleted_peers(24).unwrap();
assert_eq!(n_peers.len(), 20);
assert!(
!n_peers
.iter()
.any(|n| n.node_id == node_peers[1].node_id || n.node_id == node_peers[4].node_id)
);
assert!(
!n_peers
.iter()
.any(|n| n.node_id == wallet_peers[1].node_id || n.node_id == wallet_peers[4].node_id)
);
let last_seen = chrono::Utc::now().naive_utc() -
chrono::Duration::from_std(Duration::from_secs(120)).unwrap_or(TimeDelta::MAX);
for address in node_peers[8].addresses.addresses() {
peers_db
.set_last_seen(&node_peers[8].node_id, last_seen, address.address())
.unwrap();
}
assert_eq!(
peers_db
.get_peer_by_node_id(&node_peers[8].node_id)
.unwrap()
.unwrap()
.last_seen()
.unwrap(),
last_seen
);
for address in wallet_peers[8].addresses.addresses() {
peers_db
.set_last_seen(&wallet_peers[8].node_id, last_seen, address.address())
.unwrap();
}
assert_eq!(
peers_db
.get_peer_by_node_id(&wallet_peers[8].node_id)
.unwrap()
.unwrap()
.last_seen()
.unwrap(),
last_seen
);
let seed_peers = peers_db.get_seed_peers().unwrap();
assert_eq!(seed_peers.len(), 3);
for peer in &seed_peers {
assert!(peer.is_seed());
}
let random_peers = peers_db
.get_n_random_peers(5, &[node_peers[0].node_id.clone()], None, &transport_protocols, false)
.unwrap();
assert_eq!(random_peers.len(), 5);
assert!(
!random_peers
.iter()
.any(|n| n.node_id == node_peers[1].node_id || n.node_id == node_peers[4].node_id)
);
assert!(!random_peers.iter().any(|n| n.node_id == node_peers[0].node_id));
let peer = peers_db.get_peer_by_node_id(&node_peers[4].node_id).unwrap().unwrap();
assert!(peer.is_banned());
assert!(peer.last_seen().is_some());
peers_db.reset_banned(&node_peers[4].node_id).unwrap();
for address in peer.addresses.address_iter() {
peers_db.reset_last_seen(&node_peers[4].node_id, address).unwrap();
}
let peer = peers_db.get_peer_by_node_id(&node_peers[4].node_id).unwrap().unwrap();
assert!(!peer.is_banned());
assert!(peer.last_seen().is_none());
for peer in node_peers.iter().chain(wallet_peers.iter()) {
peers_db
.set_banned(
&peer.node_id,
Duration::from_secs(12345),
"Misbehaviour is punished".to_string(),
)
.unwrap();
}
let all_peers = peers_db.get_all_peers(None).unwrap();
for peer in &all_peers {
assert!(peer.is_banned());
}
peers_db.reset_all_banned().unwrap();
let all_peers = peers_db.get_all_peers(None).unwrap();
for peer in &all_peers {
assert!(!peer.is_banned());
}
for peer in &node_peers {
let mut peer = peer.clone();
let addresses = peer.addresses.addresses().to_vec();
for address in &addresses {
peer.addresses
.mark_failed_connection_attempt(address.address(), "Misbehave".to_string());
}
peers_db.add_or_update_peer(peer.clone()).unwrap();
}
let all_peers = peers_db.get_all_peers(Some(PeerFeatures::COMMUNICATION_NODE)).unwrap();
for peer in &all_peers {
assert!(peer.last_connect_attempt().is_some());
}
peers_db.reset_offline_non_wallet_peers().unwrap();
let all_peers = peers_db.get_all_peers(Some(PeerFeatures::COMMUNICATION_NODE)).unwrap();
for peer in &all_peers {
assert!(peer.last_connect_attempt().is_none(), "peer: {peer}");
}
for address in node_peers[11].addresses.addresses() {
peers_db
.set_last_failed_reason(
&node_peers[11].node_id,
"not playing with".to_string(),
address.address(),
)
.unwrap();
}
let peer = peers_db.get_peer_by_node_id(&node_peers[11].node_id).unwrap().unwrap();
assert!(peer.all_addresses_failed());
for address in peer.addresses.address_iter() {
peers_db
.reset_last_failed_reason(&node_peers[11].node_id, address)
.unwrap();
}
let peer = peers_db.get_peer_by_node_id(&node_peers[11].node_id).unwrap().unwrap();
assert!(!peer.all_addresses_failed());
peers_db
.set_metadata(&node_peers[5].node_id, 111, vec![1, 2, 3])
.unwrap();
peers_db
.set_metadata(&node_peers[5].node_id, 222, vec![4, 5, 6])
.unwrap();
let peer = peers_db.get_peer_by_node_id(&node_peers[5].node_id).unwrap().unwrap();
assert_eq!(peer.metadata.get(&111).unwrap(), &[1, 2, 3]);
assert_eq!(peer.metadata.get(&222).unwrap(), &[4, 5, 6]);
}
#[test]
fn test_get_duplicate_addresses_allowed() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let mut node_peers = Vec::with_capacity(5);
for _i in 0..5 {
let peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
node_peers.push(peer.clone());
peers_db.add_or_update_peer(peer).unwrap();
}
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
let duplicate_address = node_peers[0]
.addresses
.addresses()
.iter()
.map(|v| v.address())
.cloned()
.collect::<Vec<_>>();
for address in &duplicate_address {
peer.addresses
.add_or_update_addresses(std::slice::from_ref(address), &PeerAddressSource::Config)
}
peers_db.add_or_update_peer(peer.clone()).unwrap();
let peer_from_db = peers_db.get_peer_by_node_id(&peer.node_id).unwrap().unwrap();
let peer_addresses = peer_from_db
.addresses
.addresses()
.iter()
.map(|v| v.address())
.cloned()
.collect::<Vec<_>>();
assert!(duplicate_address.iter().any(|v| peer_addresses.contains(v)));
let original_peer_from_db = peers_db.get_peer_by_node_id(&node_peers[0].node_id).unwrap().unwrap();
let original_peer_addresses = original_peer_from_db
.addresses
.addresses()
.iter()
.map(|v| v.address())
.cloned()
.collect::<Vec<_>>();
assert!(duplicate_address.iter().any(|v| original_peer_addresses.contains(v)));
}
#[test]
fn discovery_syncing_peers_with_external_addresses_only() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
for _i in 0..20 {
let peer = create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_NODE);
peers_db.add_or_update_peer(peer).unwrap();
let peer = create_test_peer_add_internal_addresses(false, PeerFeatures::COMMUNICATION_CLIENT);
peers_db.add_or_update_peer(peer).unwrap();
}
assert_eq!(peers_db.size(), 40);
let nodes_with_all_addresses = peers_db
.get_n_random_active_peers(100, &[], None, None, None, false, &[])
.unwrap();
assert_eq!(nodes_with_all_addresses.len(), 40);
assert!(
nodes_with_all_addresses
.iter()
.all(|p| { p.addresses.addresses().iter().any(|addr| addr.is_external()) })
);
assert!(
nodes_with_all_addresses
.iter()
.all(|p| { p.addresses.addresses().iter().any(|addr| !addr.is_external()) })
);
for _i in 0..4 {
let peer = create_test_peer_internal_addresses_only(false, PeerFeatures::COMMUNICATION_NODE);
peers_db.add_or_update_peer(peer).unwrap();
let peer = create_test_peer_internal_addresses_only(false, PeerFeatures::COMMUNICATION_CLIENT);
peers_db.add_or_update_peer(peer).unwrap();
}
for _i in 0..4 {
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
peer.addresses = MultiaddressesWithStats::new(vec![]);
peers_db.add_or_update_peer(peer).unwrap();
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_CLIENT);
peer.addresses = MultiaddressesWithStats::new(vec![]);
peers_db.add_or_update_peer(peer).unwrap();
}
assert_eq!(peers_db.size(), 56);
let all_peers = peers_db.get_all_peers(None).unwrap();
assert_eq!(all_peers.len(), 56);
let nodes_with_external_addresses_only = peers_db
.get_n_random_active_peers(100, &[], None, None, None, true, &[])
.unwrap();
assert_eq!(nodes_with_external_addresses_only.len(), 40);
assert!(
nodes_with_external_addresses_only
.iter()
.all(|p| { p.addresses.addresses().iter().all(|addr| addr.is_external()) })
);
}
#[test]
fn test_duration_to_i64_ms_infallible() {
assert_eq!(duration_to_i64_ms_infallible(None), None);
assert_eq!(duration_to_i64_ms_infallible(Some(Duration::from_millis(0))), Some(0));
assert_eq!(duration_to_i64_ms_infallible(Some(Duration::from_millis(42))), Some(42));
assert_eq!(
duration_to_i64_ms_infallible(Some(Duration::from_millis(1234))),
Some(1234)
);
assert_eq!(
duration_to_i64_ms_infallible(Some(Duration::from_secs(12))),
Some(12 * 1000)
);
assert_eq!(
duration_to_i64_ms_infallible(Some(Duration::from_secs(1234))),
Some(1234 * 1000)
);
assert_eq!(
duration_to_i64_ms_infallible(Some(Duration::from_secs(3 * 60 * 60 * 24))),
Some(3 * 60 * 60 * 24 * 1000)
);
assert_eq!(
duration_to_i64_ms_infallible(Some(Duration::from_secs(123 * 60 * 60 * 24))),
Some(123 * 60 * 60 * 24 * 1000)
);
assert_eq!(
duration_to_i64_ms_infallible(Some(Duration::from_secs(u64::MAX))),
Some(i64::MAX)
);
}
#[test]
fn test_u32_to_i32_infallible() {
assert_eq!(u32_to_i32_infallible(0u32), 0i32);
assert_eq!(u32_to_i32_infallible(1234u32), 1234i32);
assert_eq!(u32_to_i32_infallible(u32::MAX), i32::MAX);
}
#[test]
fn it_correctly_updates_none_as_null() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
let address = peer.last_address_used().unwrap();
peer.addresses
.mark_failed_connection_attempt(&address, "This has failed".to_string());
peers_db.add_or_update_peer(peer.clone()).unwrap();
let peer_from_db = peers_db.get_peer_by_node_id(&peer.node_id).unwrap().unwrap();
let last_failed_reason = peer_from_db
.addresses
.addresses()
.iter()
.find(|a| a.address() == &address)
.and_then(|a| a.last_failed_reason());
assert!(last_failed_reason.is_some());
peer.addresses.mark_last_seen_now(&address);
peers_db.add_or_update_peer(peer.clone()).unwrap();
let peer_from_db = peers_db.get_peer_by_node_id(&peer.node_id).unwrap().unwrap();
let last_failed_reason = peer_from_db
.addresses
.addresses()
.iter()
.find(|a| a.address() == &address)
.and_then(|a| a.last_failed_reason());
assert!(last_failed_reason.is_none());
}
#[test]
fn it_correctly_handles_a_peer_without_any_address() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
peer.addresses = MultiaddressesWithStats::new(vec![]);
peers_db.add_or_update_peer(peer.clone()).unwrap();
let peer_from_db = peers_db.get_peer_by_node_id(&peer.node_id).unwrap();
assert!(peer_from_db.is_some());
let peer_from_db = peer_from_db.unwrap();
assert_eq!(peer_from_db.addresses.addresses().len(), 1);
assert_eq!(peer_from_db.addresses.addresses()[0].address(), &Multiaddr::empty());
assert_eq!(peer_from_db.addresses.addresses()[0].last_seen(), None,);
assert_eq!(peer_from_db.addresses.addresses()[0].connection_attempts(), 0);
assert_eq!(peer_from_db.addresses.addresses()[0].avg_initial_dial_time(), None);
assert_eq!(
peer_from_db.addresses.addresses()[0].initial_dial_time_sample_count(),
0
);
assert_eq!(peer_from_db.addresses.addresses()[0].avg_latency(), None);
assert_eq!(peer_from_db.addresses.addresses()[0].latency_sample_count(), 0);
assert_eq!(peer_from_db.addresses.addresses()[0].last_attempted(), None);
assert_eq!(peer_from_db.addresses.addresses()[0].last_failed_reason(), None);
assert_eq!(peer_from_db.addresses.addresses()[0].quality_score(), None);
assert_eq!(
peer_from_db.addresses.addresses()[0].source(),
&PeerAddressSource::Config
);
assert_eq!(peer_from_db.node_id, peer.node_id);
assert_eq!(peer_from_db.public_key, peer.public_key);
assert_eq!(peer_from_db.features, peer.features);
for i in 1..NodeId::byte_size() {
let matches = peers_db
.find_all_peers_match_partial_key(&peer.node_id.as_bytes()[0..i])
.unwrap();
assert!(matches.contains(&peer));
}
for i in 1..CommsPublicKey::key_length() {
let matches = peers_db
.find_all_peers_match_partial_key(&peer.public_key.as_bytes()[0..i])
.unwrap();
assert!(matches.contains(&peer));
}
let mut conn = peers_db.connection.get_pooled_connection().unwrap();
let peer_query: Vec<NewPeerSql> = peers::table.load::<NewPeerSql>(&mut conn).unwrap();
assert_eq!(peer_query.len(), 1);
let addresses_query: Vec<NewMultiaddrWithStatsSql> = multi_addresses::table
.load::<NewMultiaddrWithStatsSql>(&mut conn)
.unwrap();
assert!(addresses_query.is_empty());
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_get_available_dial_candidates() {
let db_connection = DbConnection::connect_temp_file_and_migrate(MIGRATIONS).unwrap();
let peers_db = PeerDatabaseSql::new(
db_connection,
&create_test_peer(false, PeerFeatures::COMMUNICATION_NODE),
)
.unwrap();
let transport_protocols = TransportProtocol::get_all();
let mut node_peers = Vec::with_capacity(100);
for i in 0..100 {
let mut peer = create_test_peer(false, PeerFeatures::COMMUNICATION_NODE);
if i % 4 == 0 {
peer.flags = PeerFlags::SEED;
}
node_peers.push(peer.clone());
peers_db.add_or_update_peer(peer).unwrap();
}
peers_db.soft_delete_peer(&node_peers[1].node_id).unwrap();
peers_db
.set_banned(
&node_peers[5].node_id,
Duration::from_secs(12345),
"Misbehaviour is punished".to_string(),
)
.unwrap();
for address in node_peers[7].addresses.addresses() {
peers_db
.set_last_failed_reason(
&node_peers[7].node_id,
"not playing with".to_string(),
address.address(),
)
.unwrap();
}
let all_peers = peers_db.get_all_peers(None).unwrap();
assert_eq!(all_peers.len(), 100);
let seed_peers = peers_db
.get_seed_peers()
.unwrap()
.iter()
.map(|p| p.node_id.clone())
.collect::<Vec<_>>();
assert_eq!(seed_peers.len(), 25);
let dial_candidates = peers_db
.get_available_dial_candidates(&[], None, &transport_protocols, true, false)
.unwrap();
assert_eq!(dial_candidates.len(), 97);
assert!(!dial_candidates.iter().any(|n| n.node_id == node_peers[1].node_id ||
n.node_id == node_peers[5].node_id ||
n.node_id == node_peers[7].node_id));
let dial_candidates = peers_db
.get_available_dial_candidates(&seed_peers, None, &transport_protocols, true, false)
.unwrap();
assert_eq!(dial_candidates.len(), 72);
assert!(!dial_candidates.iter().any(|n| n.node_id == node_peers[1].node_id ||
n.node_id == node_peers[5].node_id ||
n.node_id == node_peers[7].node_id));
for seed_node_id in &seed_peers {
assert!(!dial_candidates.iter().any(|n| n.node_id == *seed_node_id));
}
let limit = 17;
let mut dial_candidates_1 = peers_db
.get_available_dial_candidates(&[], Some(limit), &transport_protocols, true, false)
.unwrap()
.iter()
.map(|p| p.node_id.clone())
.collect::<Vec<_>>();
assert_eq!(dial_candidates_1.len(), limit);
dial_candidates_1.sort();
let mut dial_candidates_2 = peers_db
.get_available_dial_candidates(&[], Some(limit), &transport_protocols, true, false)
.unwrap()
.iter()
.map(|p| p.node_id.clone())
.collect::<Vec<_>>();
assert_eq!(dial_candidates_2.len(), limit);
dial_candidates_2.sort();
let mut dial_candidates_3 = peers_db
.get_available_dial_candidates(&[], Some(limit), &transport_protocols, true, true)
.unwrap()
.iter()
.map(|p| p.node_id.clone())
.collect::<Vec<_>>();
assert_eq!(dial_candidates_3.len(), limit);
dial_candidates_3.sort();
assert_eq!(dial_candidates_1, dial_candidates_2);
assert_ne!(dial_candidates_1, dial_candidates_3);
}
}