snarkos_node_router/
lib.rs1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27use snarkos_utilities::NodeDataDir;
28
29mod handshake;
30
31mod heartbeat;
32pub use heartbeat::*;
33
34mod helpers;
35pub use helpers::*;
36
37mod inbound;
38pub use inbound::*;
39
40mod outbound;
41pub use outbound::*;
42
43mod routing;
44pub use routing::*;
45
46mod writing;
47
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_network::{
53 CandidatePeer,
54 ConnectedPeer,
55 ConnectionMode,
56 NodeType,
57 Peer,
58 PeerPoolHandling,
59 Resolver,
60 bootstrap_peers,
61};
62use snarkos_node_sync_communication_service::CommunicationService;
63use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
64
65use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
66
67use anyhow::Result;
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
73use tokio::task::JoinHandle;
74
75pub const DEFAULT_NODE_PORT: u16 = 4130;
77
78#[derive(Clone)]
82pub struct Router<N: Network>(Arc<InnerRouter<N>>);
83
84impl<N: Network> Deref for Router<N> {
85 type Target = Arc<InnerRouter<N>>;
86
87 fn deref(&self) -> &Self::Target {
88 &self.0
89 }
90}
91
92impl<N: Network> PeerPoolHandling<N> for Router<N> {
93 const MAXIMUM_POOL_SIZE: usize = 10_000;
94 const OWNER: &str = "[Router]";
95 const PEER_SLASHING_COUNT: usize = 200;
96
97 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
98 &self.peer_pool
99 }
100
101 fn resolver(&self) -> &RwLock<Resolver<N>> {
102 &self.resolver
103 }
104
105 fn is_dev(&self) -> bool {
106 self.is_dev
107 }
108
109 fn trusted_peers_only(&self) -> bool {
110 self.trusted_peers_only
111 }
112
113 fn node_type(&self) -> NodeType {
114 self.node_type
115 }
116}
117
118pub struct InnerRouter<N: Network> {
119 tcp: Tcp,
121 node_type: NodeType,
123 account: Account<N>,
125 ledger: Arc<dyn LedgerService<N>>,
127 cache: Cache<N>,
129 resolver: RwLock<Resolver<N>>,
131 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
133 handles: Mutex<Vec<JoinHandle<()>>>,
135 trusted_peers_only: bool,
137 node_data_dir: NodeDataDir,
139 is_dev: bool,
141}
142
143impl<N: Network> Router<N> {
144 #[cfg(not(feature = "test"))]
146 const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
147 #[cfg(not(feature = "test"))]
149 const MAX_CONNECTION_ATTEMPTS: usize = 10;
150 const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); }
154
155impl<N: Network> Router<N> {
156 #[allow(clippy::too_many_arguments)]
158 pub async fn new(
159 node_ip: SocketAddr,
160 node_type: NodeType,
161 account: Account<N>,
162 ledger: Arc<dyn LedgerService<N>>,
163 trusted_peers: &[SocketAddr],
164 max_peers: u16,
165 trusted_peers_only: bool,
166 node_data_dir: NodeDataDir,
167 is_dev: bool,
168 ) -> Result<Self> {
169 let tcp = Tcp::new(Config::new(node_ip, max_peers));
171
172 let mut initial_peers = HashMap::new();
174
175 if !trusted_peers_only {
177 let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
178 for addr in cached_peers {
179 initial_peers.insert(addr, Peer::new_candidate(addr, false));
180 }
181 }
182
183 initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
186
187 Ok(Self(Arc::new(InnerRouter {
189 tcp,
190 node_type,
191 account,
192 ledger,
193 cache: Default::default(),
194 resolver: Default::default(),
195 peer_pool: RwLock::new(initial_peers),
196 handles: Default::default(),
197 trusted_peers_only,
198 node_data_dir,
199 is_dev,
200 })))
201 }
202}
203
204impl<N: Network> Router<N> {
205 pub fn is_valid_message_version(&self, message_version: u32) -> bool {
207 let lowest_accepted_message_version = match self.node_type {
211 NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
214 NodeType::Validator | NodeType::Client => {
216 Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
217 }
218 };
219
220 message_version >= lowest_accepted_message_version
222 }
223
224 pub fn private_key(&self) -> &PrivateKey<N> {
226 self.account.private_key()
227 }
228
229 pub fn view_key(&self) -> &ViewKey<N> {
231 self.account.view_key()
232 }
233
234 pub fn address(&self) -> Address<N> {
236 self.account.address()
237 }
238
239 pub fn cache(&self) -> &Cache<N> {
241 &self.cache
242 }
243
244 pub fn trusted_peers_only(&self) -> bool {
246 self.trusted_peers_only
247 }
248
249 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
251 self.resolver.read().get_listener(connected_addr)
252 }
253
254 pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
256 self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
257 }
258
259 #[cfg(feature = "metrics")]
260 pub fn update_metrics(&self) {
261 metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
262 metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
263 }
264
265 pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
266 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
267 peer.update_last_seen();
268 }
269 }
270
271 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
273 self.handles.lock().push(tokio::spawn(future));
274 }
275
276 pub async fn shut_down(&self) {
278 info!("Shutting down the router...");
279 if let Err(e) =
281 self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
282 {
283 warn!("Failed to persist best peers to disk: {e}");
284 }
285 self.handles.lock().iter().for_each(|handle| handle.abort());
287 self.tcp.shut_down().await;
289 }
290}
291
292#[async_trait]
293impl<N: Network> CommunicationService for Router<N> {
294 type Message = Message<N>;
296
297 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
299 debug_assert!(start_height < end_height, "Invalid block request format");
300 Message::BlockRequest(BlockRequest { start_height, end_height })
301 }
302
303 async fn send(
309 &self,
310 peer_ip: SocketAddr,
311 message: Self::Message,
312 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
313 self.send(peer_ip, message)
314 }
315}