snarkos_node_router/
lib.rs1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27use snarkos_utilities::NodeDataDir;
28
29mod handshake;
30
31mod heartbeat;
32pub use heartbeat::*;
33
34mod helpers;
35pub use helpers::*;
36
37mod inbound;
38pub use inbound::*;
39
40mod outbound;
41pub use outbound::*;
42
43mod routing;
44pub use routing::*;
45
46mod writing;
47
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_network::{
53 CandidatePeer,
54 ConnectedPeer,
55 ConnectionMode,
56 NodeType,
57 Peer,
58 PeerPoolHandling,
59 Resolver,
60 bootstrap_peers,
61};
62use snarkos_node_sync_communication_service::CommunicationService;
63use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
64
65use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
66
67use anyhow::Result;
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc};
73use tokio::task::JoinHandle;
74
75pub const DEFAULT_NODE_PORT: u16 = 4130;
77
78#[derive(Clone)]
82pub struct Router<N: Network>(Arc<InnerRouter<N>>);
83
84impl<N: Network> Deref for Router<N> {
85 type Target = Arc<InnerRouter<N>>;
86
87 fn deref(&self) -> &Self::Target {
88 &self.0
89 }
90}
91
92impl<N: Network> PeerPoolHandling<N> for Router<N> {
93 const MAXIMUM_POOL_SIZE: usize = 10_000;
94 const OWNER: &str = "[Router]";
95 const PEER_SLASHING_COUNT: usize = 200;
96
97 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
98 &self.peer_pool
99 }
100
101 fn resolver(&self) -> &RwLock<Resolver<N>> {
102 &self.resolver
103 }
104
105 fn is_dev(&self) -> bool {
106 self.is_dev
107 }
108
109 fn trusted_peers_only(&self) -> bool {
110 self.trusted_peers_only
111 }
112
113 fn node_type(&self) -> NodeType {
114 self.node_type
115 }
116}
117
118pub struct InnerRouter<N: Network> {
119 tcp: Tcp,
121 node_type: NodeType,
123 account: Account<N>,
125 ledger: Arc<dyn LedgerService<N>>,
127 cache: Cache<N>,
129 resolver: RwLock<Resolver<N>>,
131 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
133 handles: Mutex<Vec<JoinHandle<()>>>,
135 trusted_peers_only: bool,
137 node_data_dir: NodeDataDir,
139 is_dev: bool,
141}
142
143impl<N: Network> Router<N> {
144 #[cfg(not(feature = "test"))]
146 const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
147 #[cfg(not(feature = "test"))]
149 const MAX_CONNECTION_ATTEMPTS: usize = 10;
150}
151
152impl<N: Network> Router<N> {
153 #[allow(clippy::too_many_arguments)]
155 pub async fn new(
156 node_ip: SocketAddr,
157 node_type: NodeType,
158 account: Account<N>,
159 ledger: Arc<dyn LedgerService<N>>,
160 trusted_peers: &[SocketAddr],
161 max_peers: u16,
162 trusted_peers_only: bool,
163 node_data_dir: NodeDataDir,
164 is_dev: bool,
165 ) -> Result<Self> {
166 let tcp = Tcp::new(Config::new(node_ip, max_peers));
168
169 let mut initial_peers = HashMap::new();
171
172 if !trusted_peers_only {
174 let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
175 for addr in cached_peers {
176 initial_peers.insert(addr, Peer::new_candidate(addr, false));
177 }
178 }
179
180 initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
183
184 Ok(Self(Arc::new(InnerRouter {
186 tcp,
187 node_type,
188 account,
189 ledger,
190 cache: Default::default(),
191 resolver: Default::default(),
192 peer_pool: RwLock::new(initial_peers),
193 handles: Default::default(),
194 trusted_peers_only,
195 node_data_dir,
196 is_dev,
197 })))
198 }
199}
200
201impl<N: Network> Router<N> {
202 pub fn is_valid_message_version(&self, message_version: u32) -> bool {
204 let lowest_accepted_message_version = match self.node_type {
208 NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
211 NodeType::Validator | NodeType::Client => {
213 Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
214 }
215 };
216
217 message_version >= lowest_accepted_message_version
219 }
220
221 pub fn private_key(&self) -> &PrivateKey<N> {
223 self.account.private_key()
224 }
225
226 pub fn view_key(&self) -> &ViewKey<N> {
228 self.account.view_key()
229 }
230
231 pub fn address(&self) -> Address<N> {
233 self.account.address()
234 }
235
236 pub fn cache(&self) -> &Cache<N> {
238 &self.cache
239 }
240
241 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
243 &self.ledger
244 }
245
246 pub fn trusted_peers_only(&self) -> bool {
248 self.trusted_peers_only
249 }
250
251 pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
253 self.resolver.read().get_listener(connected_addr)
254 }
255
256 pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
258 self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
259 }
260
261 #[cfg(feature = "metrics")]
262 pub fn update_metrics(&self) {
263 metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
264 metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
265 }
266
267 pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
268 if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
269 peer.update_last_seen();
270 }
271 }
272
273 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
275 self.handles.lock().push(tokio::spawn(future));
276 }
277
278 pub async fn shut_down(&self) {
280 info!("Shutting down the router...");
281 if let Err(e) =
283 self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
284 {
285 warn!("Failed to persist best peers to disk: {e}");
286 }
287 self.handles.lock().iter().for_each(|handle| handle.abort());
289 self.tcp.shut_down().await;
291 }
292}
293
294#[async_trait]
295impl<N: Network> CommunicationService for Router<N> {
296 type Message = Message<N>;
298
299 fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
301 debug_assert!(start_height < end_height, "Invalid block request format");
302 Message::BlockRequest(BlockRequest { start_height, end_height })
303 }
304
305 async fn send(
311 &self,
312 peer_ip: SocketAddr,
313 message: Self::Message,
314 ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
315 self.send(peer_ip, message)
316 }
317}