use std::{
cmp,
collections::HashMap,
convert::TryFrom,
fmt::Display,
hash::{Hash, Hasher},
time::Duration,
};
use bitflags::bitflags;
use chrono::{NaiveDateTime, Utc};
use multiaddr::Multiaddr;
use serde::{Deserialize, Serialize};
use tari_utilities::hex::serialize_to_hex;
use super::{
PeerFeatures,
node_id::{NodeId, deserialize_node_id_from_hex},
peer_id::PeerId,
};
use crate::{
net_address::{MultiaddressesWithStats, PeerAddressSource},
protocol::ProtocolId,
types::CommsPublicKey,
utils::datetime::{format_local_datetime, is_max_datetime, safe_future_datetime_from_duration},
};
bitflags! {
#[derive(Default, Deserialize, Serialize, Eq, PartialEq, Debug, Clone, Copy)]
pub struct PeerFlags: u8 {
const NONE = 0x00;
const SEED = 0x01;
}
}
impl PeerFlags {
pub fn to_i32(&self) -> i32 {
i32::from_le_bytes([self.bits(), 0, 0, 0])
}
}
#[derive(Debug, Clone, Deserialize, Serialize, Eq)]
pub struct Peer {
pub(super) id: Option<PeerId>,
pub public_key: CommsPublicKey,
#[serde(serialize_with = "serialize_to_hex")]
#[serde(deserialize_with = "deserialize_node_id_from_hex")]
pub node_id: NodeId,
pub addresses: MultiaddressesWithStats,
pub flags: PeerFlags,
pub banned_until: Option<NaiveDateTime>,
pub banned_reason: String,
pub features: PeerFeatures,
pub supported_protocols: Vec<ProtocolId>,
pub added_at: NaiveDateTime,
pub user_agent: String,
pub metadata: HashMap<u8, Vec<u8>>,
pub deleted_at: Option<NaiveDateTime>,
}
impl Peer {
pub fn new(
public_key: CommsPublicKey,
node_id: NodeId,
addresses: MultiaddressesWithStats,
flags: PeerFlags,
features: PeerFeatures,
supported_protocols: Vec<ProtocolId>,
user_agent: String,
) -> Peer {
Peer {
id: None,
public_key,
node_id,
addresses,
flags,
features,
banned_until: None,
banned_reason: String::new(),
added_at: Utc::now().naive_utc(),
supported_protocols,
user_agent,
metadata: HashMap::new(),
deleted_at: None,
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_stats(
id: Option<PeerId>,
public_key: CommsPublicKey,
node_id: NodeId,
addresses: MultiaddressesWithStats,
flags: PeerFlags,
banned_until: Option<NaiveDateTime>,
banned_reason: String,
features: PeerFeatures,
supported_protocols: Vec<ProtocolId>,
added_at: NaiveDateTime,
user_agent: String,
metadata: HashMap<u8, Vec<u8>>,
deleted_at: Option<NaiveDateTime>,
) -> Peer {
Peer {
id,
public_key,
node_id,
addresses,
flags,
features,
banned_until,
banned_reason,
added_at,
supported_protocols,
user_agent,
metadata,
deleted_at,
}
}
pub fn id(&self) -> PeerId {
self.id.expect("call to Peer::id() when peer is not persisted")
}
pub fn merge(&mut self, other: &Peer) {
self.addresses.merge(&other.addresses);
if !other.banned_reason.is_empty() {
self.banned_reason = other.banned_reason.clone();
}
self.banned_until = cmp::max(self.banned_until, other.banned_until);
self.added_at = cmp::min(self.added_at, other.added_at);
for protocol in &other.supported_protocols {
if !self.supported_protocols.contains(protocol) {
self.supported_protocols.push(protocol.clone());
}
}
self.metadata = other.metadata.clone();
self.features = other.features;
self.flags = other.flags;
if !other.user_agent.is_empty() {
self.user_agent = other.user_agent.clone();
}
}
pub fn is_persisted(&self) -> bool {
self.id.is_some()
}
pub fn supported_protocols(&self) -> &[ProtocolId] {
&self.supported_protocols
}
pub fn last_connect_attempt(&self) -> Option<NaiveDateTime> {
self.addresses
.addresses()
.iter()
.max_by_key(|a| a.last_attempted())
.and_then(|a| a.last_attempted())
}
pub fn last_address_used(&self) -> Option<Multiaddr> {
self.addresses
.addresses()
.iter()
.max_by_key(|a| a.last_attempted())
.map(|a| a.address().clone())
}
pub fn is_offline(&self) -> bool {
self.addresses.offline_at().is_some()
}
pub fn offline_at(&self) -> Option<NaiveDateTime> {
self.addresses.offline_at()
}
pub fn offline_since(&self) -> Option<Duration> {
let offline_at = self.addresses.offline_at();
offline_at
.map(|offline_at| Utc::now().naive_utc() - offline_at)
.map(|since| Duration::from_secs(u64::try_from(since.num_seconds()).unwrap_or(0)))
}
pub(super) fn set_id(&mut self, id: PeerId) {
self.id = Some(id);
}
#[cfg(test)]
pub(crate) fn set_id_for_test(&mut self, id: PeerId) {
self.id = Some(id);
}
pub fn last_seen(&self) -> Option<NaiveDateTime> {
self.addresses.last_seen()
}
pub fn all_addresses_failed(&self) -> bool {
self.addresses.iter().all(|a| a.last_failed_reason().is_some())
}
pub fn last_seen_since(&self) -> Option<Duration> {
self.last_seen()
.and_then(|dt| Utc::now().naive_utc().signed_duration_since(dt).to_std().ok())
}
pub fn is_banned(&self) -> bool {
self.banned_until().is_some()
}
pub fn reason_banned(&self) -> &str {
&self.banned_reason
}
pub fn ban_for(&mut self, duration: Duration, reason: String) {
let dt = safe_future_datetime_from_duration(duration);
self.banned_until = Some(dt.naive_utc());
self.banned_reason = reason;
}
pub fn unban(&mut self) {
self.banned_until = None;
self.banned_reason = "".to_string();
}
pub fn banned_until(&self) -> Option<&NaiveDateTime> {
self.banned_until.as_ref().filter(|dt| *dt > &Utc::now().naive_utc())
}
pub fn set_metadata(&mut self, key: u8, data: Vec<u8>) -> Option<Vec<u8>> {
self.metadata.insert(key, data)
}
pub fn get_metadata(&self, key: u8) -> Option<&Vec<u8>> {
self.metadata.get(&key)
}
pub fn update_addresses(&mut self, addresses: &[Multiaddr], source: &PeerAddressSource) -> &mut Self {
self.addresses.add_or_update_addresses(addresses, source);
self
}
pub fn set_features(&mut self, features: PeerFeatures) -> &mut Self {
if self.features != features {
self.features = features;
}
self
}
pub fn add_flags(&mut self, flags: PeerFlags) -> &mut Self {
self.flags |= flags;
self
}
pub fn is_seed(&self) -> bool {
self.flags.contains(PeerFlags::SEED)
}
pub fn to_short_string(&self) -> String {
format!("{}::{}", self.public_key, self.addresses)
}
}
impl Display for Peer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let flags_str = if self.flags == PeerFlags::empty() {
"".to_string()
} else {
format!("{:?}", self.flags)
};
let status_str = {
let mut s = Vec::new();
if let Some(offline_at) = self.offline_at() {
s.push(format!("Offline since: {}", format_local_datetime(&offline_at)));
}
if let Some(dt) = self.banned_until() {
if is_max_datetime(dt) {
s.push("Banned permanently".to_string());
} else {
s.push(format!("Banned until: {}", format_local_datetime(dt)));
}
s.push(format!("Reason: {}", self.banned_reason))
}
s.join(". ")
};
let user_agent = match self.user_agent.as_ref() {
"" => "<unknown>",
ua => ua,
};
f.write_str(&format!(
"{}[{}] PK={} ({}) - {}. Type: {}. User agent: {}.",
flags_str,
self.node_id.short_str(),
self.public_key,
self.addresses,
status_str,
match self.features {
PeerFeatures::COMMUNICATION_NODE => "BASE_NODE".to_string(),
PeerFeatures::COMMUNICATION_CLIENT => "WALLET".to_string(),
f => format!("{f:?}"),
},
user_agent,
))
}
}
impl PartialEq for Peer {
fn eq(&self, other: &Self) -> bool {
self.public_key == other.public_key
}
}
impl Hash for Peer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.public_key.hash(state)
}
}
#[cfg(test)]
mod test {
#![allow(clippy::indexing_slicing)]
use serde_json::Value;
use tari_crypto::tari_utilities::{hex::Hex, message_format::MessageFormat};
use super::*;
#[test]
fn test_is_banned_and_ban_for() {
let mut rng = rand::rngs::OsRng;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let addresses = MultiaddressesWithStats::from_addresses_with_source(
vec!["/ip4/123.0.0.123/tcp/8000".parse::<Multiaddr>().unwrap()],
&PeerAddressSource::Config,
);
let mut peer: Peer = Peer::new(
pk,
node_id,
addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
assert!(!peer.is_banned());
peer.ban_for(Duration::from_millis(u64::MAX), "Very long manual ban".to_string());
assert_eq!(peer.reason_banned(), &"Very long manual ban".to_string());
assert!(peer.is_banned());
peer.ban_for(Duration::from_millis(0), "".to_string());
assert!(!peer.is_banned());
}
#[test]
fn json_ser_der() {
let expected_pk_hex = "02622ace8f7303a31cafc63f8fc48fdc16e1c8c8d234b2f0d6685282a9076031";
let expected_nodeid_hex = "c1a7552e5d9e9b257c4008b965";
let pk = CommsPublicKey::from_hex(expected_pk_hex).unwrap();
let node_id = NodeId::from_key(&pk);
let peer = Peer::new(
pk,
node_id,
MultiaddressesWithStats::from_addresses_with_source(
vec!["/ip4/127.0.0.1/tcp/9000".parse::<Multiaddr>().unwrap()],
&PeerAddressSource::Config,
),
PeerFlags::empty(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
);
let json = peer.to_json().unwrap();
let json: Value = serde_json::from_str(&json).unwrap();
assert_eq!(json["public_key"], expected_pk_hex);
assert_eq!(json["node_id"], expected_nodeid_hex);
}
}