use std::{
convert::{TryFrom, TryInto},
fmt::{Display, Formatter},
time::Duration,
};
use log::*;
use tari_utilities::hex::Hex;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
connection_manager::error::ConnectionManagerError,
multiaddr::Multiaddr,
net_address::{MultiaddressesWithStats, PeerAddressSource},
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags, PeerIdentityClaim, PeerManagerError},
peer_validator::{validate_peer_identity_claim, PeerValidatorConfig, PeerValidatorError},
proto::identity::PeerIdentityMsg,
protocol,
protocol::{NodeNetworkInfo, ProtocolId},
types::CommsPublicKey,
PeerManager,
};
const LOG_TARGET: &str = "comms::connection_manager::common";
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ValidatedPeerIdentityExchange {
pub claim: PeerIdentityClaim,
pub metadata: PeerIdentityMetadata,
}
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct PeerConnectionInfo {
public_key: Option<CommsPublicKey>,
node_if: Option<NodeId>,
features: Option<PeerFeatures>,
user_agent: Option<String>,
}
impl PeerConnectionInfo {
pub fn new(public_key: Option<CommsPublicKey>, features: Option<PeerFeatures>, user_agent: Option<String>) -> Self {
Self {
public_key: public_key.clone(),
node_if: public_key.as_ref().map(NodeId::from_public_key),
features,
user_agent,
}
}
}
impl Display for PeerConnectionInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PeerConnectionInfo(public_key: {}, node_id: {}, features: {}, user_agent: {})",
self.public_key.as_ref().unwrap_or(&CommsPublicKey::default()).to_hex(),
self.node_if.as_ref().unwrap_or(&NodeId::default()),
self.features.unwrap_or_default(),
self.user_agent.as_deref().unwrap_or_default()
)
}
}
impl ValidatedPeerIdentityExchange {
pub fn peer_features(&self) -> PeerFeatures {
self.claim.features
}
pub fn supported_protocols(&self) -> &[ProtocolId] {
&self.metadata.supported_protocols
}
pub fn user_agent(&self) -> &str {
self.metadata.user_agent.as_str()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PeerIdentityMetadata {
pub user_agent: String,
pub supported_protocols: Vec<ProtocolId>,
}
pub(super) async fn perform_identity_exchange<
'p,
P: IntoIterator<Item = &'p ProtocolId>,
TSocket: AsyncRead + AsyncWrite + Unpin,
>(
socket: &mut TSocket,
node_identity: &NodeIdentity,
our_supported_protocols: P,
network_info: NodeNetworkInfo,
) -> Result<PeerIdentityMsg, ConnectionManagerError> {
let peer_identity =
protocol::identity_exchange(node_identity, our_supported_protocols, network_info, socket).await?;
Ok(peer_identity)
}
pub(super) fn validate_peer_identity_message(
config: &PeerValidatorConfig,
authenticated_public_key: &CommsPublicKey,
peer_identity_msg: PeerIdentityMsg,
) -> Result<ValidatedPeerIdentityExchange, ConnectionManagerError> {
let PeerIdentityMsg {
addresses,
features,
supported_protocols,
user_agent,
identity_signature,
} = peer_identity_msg;
if supported_protocols.len() > config.max_supported_protocols {
return Err(PeerValidatorError::PeerIdentityTooManyProtocols {
length: supported_protocols.len(),
max: config.max_supported_protocols,
}
.into());
}
if let Some(proto) = supported_protocols
.iter()
.find(|p| p.len() > config.max_protocol_id_length)
{
return Err(PeerValidatorError::PeerIdentityProtocolIdTooLong {
length: proto.len(),
max: config.max_protocol_id_length,
}
.into());
}
if addresses.is_empty() {
debug!(target: LOG_TARGET, "validate_peer_identity_message - peer address claim contains no addresses.");
}
if addresses.len() > config.max_permitted_peer_addresses_per_claim {
return Err(PeerValidatorError::PeerIdentityTooManyAddresses {
length: addresses.len(),
max: config.max_permitted_peer_addresses_per_claim,
}
.into());
}
if user_agent.len() > config.max_user_agent_byte_length {
return Err(PeerValidatorError::PeerIdentityUserAgentTooLong {
length: user_agent.len(),
max: config.max_user_agent_byte_length,
}
.into());
}
let supported_protocols = supported_protocols.into_iter().map(ProtocolId::from).collect();
let addresses = addresses
.into_iter()
.map(Multiaddr::try_from)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| PeerManagerError::MultiaddrError(e.to_string()))?;
let peer_identity_claim = PeerIdentityClaim {
addresses,
features: PeerFeatures::from_bits(features).ok_or(PeerManagerError::InvalidPeerFeatures { bits: features })?,
signature: identity_signature
.ok_or(PeerManagerError::MissingIdentitySignature)?
.try_into()?,
};
validate_peer_identity_claim(config, authenticated_public_key, &peer_identity_claim)?;
Ok(ValidatedPeerIdentityExchange {
claim: peer_identity_claim,
metadata: PeerIdentityMetadata {
user_agent,
supported_protocols,
},
})
}
pub(super) fn create_or_update_peer_from_validated_peer_identity(
known_peer: Option<Peer>,
authenticated_public_key: CommsPublicKey,
peer_identity: &ValidatedPeerIdentityExchange,
latency: Duration,
) -> Peer {
let peer_node_id = NodeId::from_public_key(&authenticated_public_key);
match known_peer {
Some(mut peer) => {
debug!(
target: LOG_TARGET,
"Peer '{}' already exists in peer list. Updating.",
peer.node_id.short_str()
);
peer.addresses.add_or_update_addresses(
&peer_identity.claim.addresses,
&PeerAddressSource::FromPeerConnection {
peer_identity_claim: peer_identity.claim.clone(),
},
);
peer.addresses
.mark_all_addresses_as_last_seen_now_with_latency(&peer_identity.claim.addresses, latency);
peer.features = peer_identity.claim.features;
peer.supported_protocols = peer_identity.metadata.supported_protocols.clone();
peer.user_agent = peer_identity.metadata.user_agent.clone();
peer
},
None => {
debug!(
target: LOG_TARGET,
"Peer '{}' does not exist in peer list. Adding.",
peer_node_id.short_str()
);
let mut addresses = MultiaddressesWithStats::from_addresses_with_source(
peer_identity.claim.addresses.clone(),
&PeerAddressSource::FromPeerConnection {
peer_identity_claim: peer_identity.claim.clone(),
},
);
addresses.mark_all_addresses_as_last_seen_now_with_latency(&peer_identity.claim.addresses, latency);
Peer::new(
authenticated_public_key,
peer_node_id,
addresses,
PeerFlags::empty(),
peer_identity.peer_features(),
peer_identity.supported_protocols().to_vec(),
peer_identity.user_agent().to_string(),
)
},
}
}
pub(super) async fn find_unbanned_peer(
peer_manager: &PeerManager,
authenticated_public_key: &CommsPublicKey,
) -> Result<Option<Peer>, ConnectionManagerError> {
match peer_manager.find_by_public_key(authenticated_public_key).await {
Ok(Some(peer)) if peer.is_banned() => Err(ConnectionManagerError::PeerBanned),
Ok(peer) => Ok(peer),
Err(err) => Err(err.into()),
}
}
pub(super) async fn ban_on_offence<T>(
peer_manager: &PeerManager,
authenticated_public_key: &CommsPublicKey,
result: Result<T, ConnectionManagerError>,
) -> Result<T, ConnectionManagerError> {
match result {
Ok(t) => Ok(t),
Err(ConnectionManagerError::PeerValidationError(e)) => {
maybe_ban(peer_manager, authenticated_public_key, e.as_ban_duration(), e).await
},
Err(ConnectionManagerError::IdentityProtocolError(e)) => {
maybe_ban(peer_manager, authenticated_public_key, e.as_ban_duration(), e).await
},
Err(err) => Err(err),
}
}
async fn maybe_ban<T, E: ToString + Into<ConnectionManagerError>>(
peer_manager: &PeerManager,
authenticated_public_key: &CommsPublicKey,
ban_duration: Option<Duration>,
err: E,
) -> Result<T, ConnectionManagerError> {
if let Some(ban_duration) = ban_duration {
if let Err(pe) = peer_manager
.ban_peer(authenticated_public_key, ban_duration, err.to_string())
.await
{
error!(target: LOG_TARGET, "Failed to ban peer due to internal error: {}. Original ban error: {}", pe, err.to_string());
}
}
Err(err.into())
}