snarkos_node_router/
lib.rs1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27
28mod handshake;
29
30mod heartbeat;
31pub use heartbeat::*;
32
33mod helpers;
34pub use helpers::*;
35
36mod inbound;
37pub use inbound::*;
38
39mod outbound;
40pub use outbound::*;
41
42mod routing;
43pub use routing::*;
44
45mod writing;
46
47use crate::messages::{BlockRequest, Message, MessageCodec};
48
49use snarkos_account::Account;
50use snarkos_node_bft_ledger_service::LedgerService;
51use snarkos_node_network::{
52 ConnectedPeer,
53 ConnectionMode,
54 NodeType,
55 Peer,
56 PeerPoolHandling,
57 Resolver,
58 bootstrap_peers,
59};
60use snarkos_node_sync_communication_service::CommunicationService;
61use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
62
63use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
64
65use aleo_std::StorageMode;
66use anyhow::Result;
67#[cfg(feature = "locktick")]
68use locktick::parking_lot::{Mutex, RwLock};
69#[cfg(not(feature = "locktick"))]
70use parking_lot::{Mutex, RwLock};
71use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
72use tokio::task::JoinHandle;
73
74pub const DEFAULT_NODE_PORT: u16 = 4130;
76
77const PEER_CACHE_FILENAME: &str = "cached_router_peers";
79
80#[derive(Clone)]
84pub struct Router<N: Network>(Arc<InnerRouter<N>>);
85
86impl<N: Network> Deref for Router<N> {
87 type Target = Arc<InnerRouter<N>>;
88
89 fn deref(&self) -> &Self::Target {
90 &self.0
91 }
92}
93
94impl<N: Network> PeerPoolHandling<N> for Router<N> {
95 const MAXIMUM_POOL_SIZE: usize = 10_000;
96 const OWNER: &str = "[Router]";
97 const PEER_SLASHING_COUNT: usize = 200;
98
99 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
100 &self.peer_pool
101 }
102
103 fn resolver(&self) -> &RwLock<Resolver<N>> {
104 &self.resolver
105 }
106
107 fn is_dev(&self) -> bool {
108 self.is_dev
109 }
110
111 fn trusted_peers_only(&self) -> bool {
112 self.trusted_peers_only
113 }
114
115 fn node_type(&self) -> NodeType {
116 self.node_type
117 }
118}
119
120pub struct InnerRouter<N: Network> {
121 tcp: Tcp,
123 node_type: NodeType,
125 account: Account<N>,
127 ledger: Arc<dyn LedgerService<N>>,
129 cache: Cache<N>,
131 resolver: RwLock<Resolver<N>>,
133 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
135 handles: Mutex<Vec<JoinHandle<()>>>,
137 trusted_peers_only: bool,
139 storage_mode: StorageMode,
141 is_dev: bool,
143}
144
145impl<N: Network> Router<N> {
146 #[cfg(not(feature = "test"))]
148 const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
149 #[cfg(not(feature = "test"))]
151 const MAX_CONNECTION_ATTEMPTS: usize = 10;
152 const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); }
156
157impl<N: Network> Router<N> {
158 #[allow(clippy::too_many_arguments)]
160 pub async fn new(
161 node_ip: SocketAddr,
162 node_type: NodeType,
163 account: Account<N>,
164 ledger: Arc<dyn LedgerService<N>>,
165 trusted_peers: &[SocketAddr],
166 max_peers: u16,
167 trusted_peers_only: bool,
168 storage_mode: StorageMode,
169 is_dev: bool,
170 ) -> Result<Self> {
171 let tcp = Tcp::new(Config::new(node_ip, max_peers));
173
174 let mut initial_peers = HashMap::new();
176
177 if !trusted_peers_only {
179 let cached_peers = Self::load_cached_peers(&storage_mode, PEER_CACHE_FILENAME)?;
180 for addr in cached_peers {
181 initial_peers.insert(addr, Peer::new_candidate(addr, false));
182 }
183 }
184
185 initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
188
189 Ok(Self(Arc::new(InnerRouter {
191 tcp,
192 node_type,
193 account,
194 ledger,
195 cache: Default::default(),
196 resolver: Default::default(),
197 peer_pool: RwLock::new(initial_peers),
198 handles: Default::default(),
199 trusted_peers_only,
200 storage_mode,
201 is_dev,
202 })))
203 }
204}
205
206impl<N: Network> Router<N> {
207 pub fn is_valid_message_version(&self, message_version: u32) -> bool {
209 let lowest_accepted_message_version = match self.node_type {
213 NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
216 NodeType::Validator | NodeType::Client => {
218 Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
219 }
220 };
221
222 message_version >= lowest_accepted_message_version
224 }
225
226 pub fn private_key(&self) -> &PrivateKey<N> {
228 self.account.private_key()
229 }
230
231 pub fn view_key(&self) -> &ViewKey<N> {
233 self.account.view_key()
234 }
235
236 pub fn address(&self) -> Address<N> {
238 self.account.address()
239 }
240
241 pub fn cache(&self) -> &Cache<N> {
243 &self.cache
244 }
245
246 pub fn trusted_peers_only(&self) -> bool {
248 self.trusted_peers_only
249 }
250
251 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
253 self.resolver.read().get_listener(connected_addr)
254 }
255
256 pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
258 self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
259 }
260
261 #[cfg(feature = "metrics")]
262 pub fn update_metrics(&self) {
263 metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
264 metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
265 }
266
267 pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
268 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
269 peer.update_last_seen();
270 }
271 }
272
273 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
275 self.handles.lock().push(tokio::spawn(future));
276 }
277
278 pub async fn shut_down(&self) {
280 info!("Shutting down the router...");
281 if let Err(e) = self.save_best_peers(&self.storage_mode, PEER_CACHE_FILENAME, Some(MAX_PEERS_TO_SEND)) {
283 warn!("Failed to persist best peers to disk: {e}");
284 }
285 self.handles.lock().iter().for_each(|handle| handle.abort());
287 self.tcp.shut_down().await;
289 }
290}
291
292#[async_trait]
293impl<N: Network> CommunicationService for Router<N> {
294 type Message = Message<N>;
296
297 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
299 debug_assert!(start_height < end_height, "Invalid block request format");
300 Message::BlockRequest(BlockRequest { start_height, end_height })
301 }
302
303 async fn send(
309 &self,
310 peer_ip: SocketAddr,
311 message: Self::Message,
312 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
313 self.send(peer_ip, message)
314 }
315}