use crate::{
ConnectionMode,
NodeType,
PeerPoolHandling,
Router,
messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait},
};
use snarkos_node_network::{get_repo_commit_hash, log_repo_sha_comparison};
use snarkos_node_tcp::{ConnectError, ConnectionSide, P2P, Tcp};
use snarkvm::{
ledger::narwhal::Data,
prelude::{Address, ConsensusVersion, Field, Network, block::Header},
};
use anyhow::{Result, anyhow};
use futures::SinkExt;
use rand::{Rng, rngs::OsRng};
use std::{io, net::SocketAddr};
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
impl<N: Network> P2P for Router<N> {
fn tcp(&self) -> &Tcp {
&self.tcp
}
}
#[macro_export]
macro_rules! expect_message {
($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
match $framed.try_next().await? {
Some($msg_ty(data)) => {
trace!("Received '{}' from '{}'", data.name(), $peer_addr);
data
}
Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
}
Some(ty) => {
return Err(ConnectError::other(format!(
"'{}' did not follow the handshake protocol: received {:?} instead of {}",
$peer_addr,
ty.name(),
stringify!($msg_ty),
)));
}
None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
}
}};
}
async fn send<N: Network>(
framed: &mut Framed<&mut TcpStream, MessageCodec<N>>,
peer_addr: SocketAddr,
message: Message<N>,
) -> io::Result<()> {
trace!("Sending '{}' to '{peer_addr}'", message.name());
framed.send(message).await
}
impl<N: Network> Router<N> {
pub async fn handshake<'a>(
&'a self,
peer_addr: SocketAddr,
stream: &'a mut TcpStream,
peer_side: ConnectionSide,
genesis_header: Header<N>,
restrictions_id: Field<N>,
) -> Result<ChallengeRequest<N>, ConnectError> {
let mut listener_addr = if peer_side == ConnectionSide::Initiator {
debug!("Received a connection request from '{peer_addr}'");
None
} else {
debug!("Shaking hands with '{peer_addr}'...");
Some(peer_addr)
};
#[cfg(not(feature = "test"))]
if !self.is_dev() && peer_side == ConnectionSide::Initiator {
if self.is_ip_banned(peer_addr.ip()) {
trace!("Rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(ConnectError::other(anyhow!("'{}' is a banned IP address", peer_addr.ip())));
}
let num_attempts =
self.cache.insert_inbound_connection(peer_addr.ip(), Router::<N>::CONNECTION_ATTEMPTS_SINCE_SECS);
debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
if num_attempts > Router::<N>::MAX_CONNECTION_ATTEMPTS {
self.update_ip_ban(peer_addr.ip());
trace!("Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}
let handshake_result = match peer_side {
ConnectionSide::Responder => {
self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
}
ConnectionSide::Initiator => {
self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id)
.await
}
};
if let Some(addr) = listener_addr {
match handshake_result {
Ok(ref cr) => {
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, Some(cr.address));
peer.upgrade_to_connected(
peer_addr,
cr.listener_port,
cr.address,
cr.node_type,
cr.version,
cr.snarkos_sha,
ConnectionMode::Router,
);
}
#[cfg(feature = "metrics")]
self.update_metrics();
}
Err(_) => {
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
if peer.is_connecting() {
peer.downgrade_to_candidate(addr);
}
}
}
}
}
handshake_result
}
async fn handshake_inner_initiator<'a>(
&'a self,
peer_addr: SocketAddr,
stream: &'a mut TcpStream,
genesis_header: Header<N>,
restrictions_id: Field<N>,
) -> Result<ChallengeRequest<N>, ConnectError> {
self.add_connecting_peer(peer_addr)?;
let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
let rng = &mut OsRng;
let current_block_height = self.ledger.latest_block_height();
let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
(true, Some(sha)) => Some(sha),
_ => None,
};
let our_nonce = rng.r#gen();
let our_request =
ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
if let Some(reason) = self
.verify_challenge_response(
peer_addr,
peer_request.address,
peer_request.node_type,
peer_response,
genesis_header,
restrictions_id,
our_nonce,
)
.await
{
send(&mut framed, peer_addr, reason.into()).await?;
return Err(reason.into_connect_error(peer_addr));
}
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(reason.into_connect_error(peer_addr));
}
let response_nonce: u64 = rng.r#gen();
let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
};
let our_response = ChallengeResponse {
genesis_header,
restrictions_id,
signature: Data::Object(our_signature),
nonce: response_nonce,
};
send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
Ok(peer_request)
}
async fn handshake_inner_responder<'a>(
&'a self,
peer_addr: SocketAddr,
listener_addr: &mut Option<SocketAddr>,
stream: &'a mut TcpStream,
genesis_header: Header<N>,
restrictions_id: Field<N>,
) -> Result<ChallengeRequest<N>, ConnectError> {
let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());
let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);
let current_block_height = self.ledger.latest_block_height();
let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
let snarkos_sha = match (consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
(true, Some(sha)) => Some(sha),
_ => None,
};
*listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
let listener_addr = listener_addr.unwrap();
if let Err(reason) = self.ensure_peer_is_allowed(listener_addr) {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(reason.into_connect_error(listener_addr));
}
self.add_connecting_peer(listener_addr)?;
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(reason.into_connect_error(peer_addr));
}
let rng = &mut OsRng;
let response_nonce: u64 = rng.r#gen();
let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
return Err(ConnectError::Other(
anyhow!("Failed to sign the challenge request nonce from '{peer_addr}'").into(),
));
};
let our_response = ChallengeResponse {
genesis_header,
restrictions_id,
signature: Data::Object(our_signature),
nonce: response_nonce,
};
send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;
let our_nonce = rng.r#gen();
let our_request =
ChallengeRequest::new(self.local_ip().port(), self.node_type, self.address(), our_nonce, snarkos_sha);
send(&mut framed, peer_addr, Message::ChallengeRequest(our_request)).await?;
let peer_response = expect_message!(Message::ChallengeResponse, framed, peer_addr);
if let Some(reason) = self
.verify_challenge_response(
peer_addr,
peer_request.address,
peer_request.node_type,
peer_response,
genesis_header,
restrictions_id,
our_nonce,
)
.await
{
send(&mut framed, peer_addr, reason.into()).await?;
Err(reason.into_connect_error(peer_addr))
} else {
Ok(peer_request)
}
}
fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
if self.is_local_ip(listener_addr) {
return Err(DisconnectReason::SelfConnect);
}
if self.node_type() == NodeType::Validator
&& !self.is_trusted(listener_addr)
&& !crate::bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr)
{
return Err(DisconnectReason::NoExternalPeersAllowed);
}
if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
return Err(DisconnectReason::NoExternalPeersAllowed);
}
Ok(())
}
fn verify_challenge_request(
&self,
peer_addr: SocketAddr,
message: &ChallengeRequest<N>,
) -> Option<DisconnectReason> {
let &ChallengeRequest { version, listener_port: _, node_type, address, nonce: _, ref snarkos_sha } = message;
log_repo_sha_comparison(peer_addr, snarkos_sha, Self::OWNER);
if !self.is_valid_message_version(version) {
warn!("Dropping '{peer_addr}' on version {version} (outdated)");
return Some(DisconnectReason::OutdatedClientVersion);
}
if self.node_type() == NodeType::Validator
&& node_type == NodeType::Validator
&& self.is_connected_address(address)
{
warn!("Dropping '{peer_addr}' for being already connected ({address})");
return Some(DisconnectReason::NoReasonGiven);
}
None
}
#[allow(clippy::too_many_arguments)]
async fn verify_challenge_response(
&self,
peer_addr: SocketAddr,
peer_address: Address<N>,
peer_node_type: NodeType,
response: ChallengeResponse<N>,
expected_genesis_header: Header<N>,
expected_restrictions_id: Field<N>,
expected_nonce: u64,
) -> Option<DisconnectReason> {
let ChallengeResponse { genesis_header, restrictions_id, signature, nonce } = response;
if genesis_header != expected_genesis_header {
warn!("Handshake with '{peer_addr}' failed (incorrect block header)");
return Some(DisconnectReason::InvalidChallengeResponse);
}
if !peer_node_type.is_prover() && !self.node_type.is_prover() && restrictions_id != expected_restrictions_id {
warn!("Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
return Some(DisconnectReason::InvalidChallengeResponse);
}
let Ok(signature) = signature.deserialize().await else {
warn!("Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
return Some(DisconnectReason::InvalidChallengeResponse);
};
if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
warn!("Handshake with '{peer_addr}' failed (invalid signature)");
return Some(DisconnectReason::InvalidChallengeResponse);
}
None
}
}