snarkos_node_router/
lib.rs1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27use snarkos_utilities::NodeDataDir;
28
29mod handshake;
30
31mod heartbeat;
32pub use heartbeat::*;
33
34mod helpers;
35pub use helpers::*;
36
37mod inbound;
38pub use inbound::*;
39
40mod outbound;
41pub use outbound::*;
42
43mod routing;
44pub use routing::*;
45
46mod writing;
47
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_network::{
53 ConnectedPeer,
54 ConnectionMode,
55 NodeType,
56 Peer,
57 PeerPoolHandling,
58 Resolver,
59 bootstrap_peers,
60};
61use snarkos_node_sync_communication_service::CommunicationService;
62use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
63
64use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
65
66use anyhow::Result;
67#[cfg(feature = "locktick")]
68use locktick::parking_lot::{Mutex, RwLock};
69#[cfg(not(feature = "locktick"))]
70use parking_lot::{Mutex, RwLock};
71use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
72use tokio::task::JoinHandle;
73
74pub const DEFAULT_NODE_PORT: u16 = 4130;
76
77#[derive(Clone)]
81pub struct Router<N: Network>(Arc<InnerRouter<N>>);
82
83impl<N: Network> Deref for Router<N> {
84 type Target = Arc<InnerRouter<N>>;
85
86 fn deref(&self) -> &Self::Target {
87 &self.0
88 }
89}
90
91impl<N: Network> PeerPoolHandling<N> for Router<N> {
92 const MAXIMUM_POOL_SIZE: usize = 10_000;
93 const OWNER: &str = "[Router]";
94 const PEER_SLASHING_COUNT: usize = 200;
95
96 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
97 &self.peer_pool
98 }
99
100 fn resolver(&self) -> &RwLock<Resolver<N>> {
101 &self.resolver
102 }
103
104 fn is_dev(&self) -> bool {
105 self.is_dev
106 }
107
108 fn trusted_peers_only(&self) -> bool {
109 self.trusted_peers_only
110 }
111
112 fn node_type(&self) -> NodeType {
113 self.node_type
114 }
115}
116
117pub struct InnerRouter<N: Network> {
118 tcp: Tcp,
120 node_type: NodeType,
122 account: Account<N>,
124 ledger: Arc<dyn LedgerService<N>>,
126 cache: Cache<N>,
128 resolver: RwLock<Resolver<N>>,
130 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
132 handles: Mutex<Vec<JoinHandle<()>>>,
134 trusted_peers_only: bool,
136 node_data_dir: NodeDataDir,
138 is_dev: bool,
140}
141
142impl<N: Network> Router<N> {
143 #[cfg(not(feature = "test"))]
145 const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
146 #[cfg(not(feature = "test"))]
148 const MAX_CONNECTION_ATTEMPTS: usize = 10;
149 const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); }
153
154impl<N: Network> Router<N> {
155 #[allow(clippy::too_many_arguments)]
157 pub async fn new(
158 node_ip: SocketAddr,
159 node_type: NodeType,
160 account: Account<N>,
161 ledger: Arc<dyn LedgerService<N>>,
162 trusted_peers: &[SocketAddr],
163 max_peers: u16,
164 trusted_peers_only: bool,
165 node_data_dir: NodeDataDir,
166 is_dev: bool,
167 ) -> Result<Self> {
168 let tcp = Tcp::new(Config::new(node_ip, max_peers));
170
171 let mut initial_peers = HashMap::new();
173
174 if !trusted_peers_only {
176 let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
177 for addr in cached_peers {
178 initial_peers.insert(addr, Peer::new_candidate(addr, false));
179 }
180 }
181
182 initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
185
186 Ok(Self(Arc::new(InnerRouter {
188 tcp,
189 node_type,
190 account,
191 ledger,
192 cache: Default::default(),
193 resolver: Default::default(),
194 peer_pool: RwLock::new(initial_peers),
195 handles: Default::default(),
196 trusted_peers_only,
197 node_data_dir,
198 is_dev,
199 })))
200 }
201}
202
203impl<N: Network> Router<N> {
204 pub fn is_valid_message_version(&self, message_version: u32) -> bool {
206 let lowest_accepted_message_version = match self.node_type {
210 NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
213 NodeType::Validator | NodeType::Client => {
215 Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
216 }
217 };
218
219 message_version >= lowest_accepted_message_version
221 }
222
223 pub fn private_key(&self) -> &PrivateKey<N> {
225 self.account.private_key()
226 }
227
228 pub fn view_key(&self) -> &ViewKey<N> {
230 self.account.view_key()
231 }
232
233 pub fn address(&self) -> Address<N> {
235 self.account.address()
236 }
237
238 pub fn cache(&self) -> &Cache<N> {
240 &self.cache
241 }
242
243 pub fn trusted_peers_only(&self) -> bool {
245 self.trusted_peers_only
246 }
247
248 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
250 self.resolver.read().get_listener(connected_addr)
251 }
252
253 pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
255 self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
256 }
257
258 #[cfg(feature = "metrics")]
259 pub fn update_metrics(&self) {
260 metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
261 metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
262 }
263
264 pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
265 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
266 peer.update_last_seen();
267 }
268 }
269
270 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
272 self.handles.lock().push(tokio::spawn(future));
273 }
274
275 pub async fn shut_down(&self) {
277 info!("Shutting down the router...");
278 if let Err(e) =
280 self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
281 {
282 warn!("Failed to persist best peers to disk: {e}");
283 }
284 self.handles.lock().iter().for_each(|handle| handle.abort());
286 self.tcp.shut_down().await;
288 }
289}
290
291#[async_trait]
292impl<N: Network> CommunicationService for Router<N> {
293 type Message = Message<N>;
295
296 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
298 debug_assert!(start_height < end_height, "Invalid block request format");
299 Message::BlockRequest(BlockRequest { start_height, end_height })
300 }
301
302 async fn send(
308 &self,
309 peer_ip: SocketAddr,
310 message: Self::Message,
311 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
312 self.send(peer_ip, message)
313 }
314}