#![forbid(unsafe_code)]
#[macro_use]
extern crate async_trait;
#[macro_use]
extern crate tracing;
#[cfg(feature = "metrics")]
extern crate snarkos_node_metrics as metrics;
pub use snarkos_node_router_messages as messages;
use snarkos_utilities::NodeDataDir;
mod handshake;
mod heartbeat;
pub use heartbeat::*;
mod helpers;
pub use helpers::*;
mod inbound;
pub use inbound::*;
mod outbound;
pub use outbound::*;
mod routing;
pub use routing::*;
mod writing;
use crate::messages::{BlockRequest, Message, MessageCodec};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_network::{
CandidatePeer,
ConnectedPeer,
ConnectionMode,
NodeType,
Peer,
PeerPoolHandling,
Resolver,
bootstrap_peers,
};
use snarkos_node_sync_communication_service::CommunicationService;
use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
use anyhow::Result;
#[cfg(feature = "locktick")]
use locktick::parking_lot::{Mutex, RwLock};
#[cfg(not(feature = "locktick"))]
use parking_lot::{Mutex, RwLock};
use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
use tokio::task::JoinHandle;
pub const DEFAULT_NODE_PORT: u16 = 4130;
#[derive(Clone)]
pub struct Router<N: Network>(Arc<InnerRouter<N>>);
impl<N: Network> Deref for Router<N> {
type Target = Arc<InnerRouter<N>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<N: Network> PeerPoolHandling<N> for Router<N> {
const MAXIMUM_POOL_SIZE: usize = 10_000;
const OWNER: &str = "[Router]";
const PEER_SLASHING_COUNT: usize = 200;
fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
&self.peer_pool
}
fn resolver(&self) -> &RwLock<Resolver<N>> {
&self.resolver
}
fn is_dev(&self) -> bool {
self.is_dev
}
fn trusted_peers_only(&self) -> bool {
self.trusted_peers_only
}
fn node_type(&self) -> NodeType {
self.node_type
}
}
pub struct InnerRouter<N: Network> {
tcp: Tcp,
node_type: NodeType,
account: Account<N>,
ledger: Arc<dyn LedgerService<N>>,
cache: Cache<N>,
resolver: RwLock<Resolver<N>>,
peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
handles: Mutex<Vec<JoinHandle<()>>>,
trusted_peers_only: bool,
node_data_dir: NodeDataDir,
is_dev: bool,
}
impl<N: Network> Router<N> {
#[cfg(not(feature = "test"))]
const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
#[cfg(not(feature = "test"))]
const MAX_CONNECTION_ATTEMPTS: usize = 10;
const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); }
impl<N: Network> Router<N> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
node_ip: SocketAddr,
node_type: NodeType,
account: Account<N>,
ledger: Arc<dyn LedgerService<N>>,
trusted_peers: &[SocketAddr],
max_peers: u16,
trusted_peers_only: bool,
node_data_dir: NodeDataDir,
is_dev: bool,
) -> Result<Self> {
let tcp = Tcp::new(Config::new(node_ip, max_peers));
let mut initial_peers = HashMap::new();
if !trusted_peers_only {
let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
for addr in cached_peers {
initial_peers.insert(addr, Peer::new_candidate(addr, false));
}
}
initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
Ok(Self(Arc::new(InnerRouter {
tcp,
node_type,
account,
ledger,
cache: Default::default(),
resolver: Default::default(),
peer_pool: RwLock::new(initial_peers),
handles: Default::default(),
trusted_peers_only,
node_data_dir,
is_dev,
})))
}
}
impl<N: Network> Router<N> {
pub fn is_valid_message_version(&self, message_version: u32) -> bool {
let lowest_accepted_message_version = match self.node_type {
NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
NodeType::Validator | NodeType::Client => {
Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
}
};
message_version >= lowest_accepted_message_version
}
pub fn private_key(&self) -> &PrivateKey<N> {
self.account.private_key()
}
pub fn view_key(&self) -> &ViewKey<N> {
self.account.view_key()
}
pub fn address(&self) -> Address<N> {
self.account.address()
}
pub fn cache(&self) -> &Cache<N> {
&self.cache
}
pub fn trusted_peers_only(&self) -> bool {
self.trusted_peers_only
}
pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
self.resolver.read().get_listener(connected_addr)
}
pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
}
#[cfg(feature = "metrics")]
pub fn update_metrics(&self) {
metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
}
pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
peer.update_last_seen();
}
}
pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
pub async fn shut_down(&self) {
info!("Shutting down the router...");
if let Err(e) =
self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
{
warn!("Failed to persist best peers to disk: {e}");
}
self.handles.lock().iter().for_each(|handle| handle.abort());
self.tcp.shut_down().await;
}
}
#[async_trait]
impl<N: Network> CommunicationService for Router<N> {
type Message = Message<N>;
fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
debug_assert!(start_height < end_height, "Invalid block request format");
Message::BlockRequest(BlockRequest { start_height, end_height })
}
async fn send(
&self,
peer_ip: SocketAddr,
message: Self::Message,
) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
self.send(peer_ip, message)
}
}