1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23pub use snarkos_node_router_messages as messages;
24
25mod handshake;
26
27mod heartbeat;
28pub use heartbeat::*;
29
30mod helpers;
31pub use helpers::*;
32
33mod inbound;
34pub use inbound::*;
35
36mod outbound;
37pub use outbound::*;
38
39mod routing;
40pub use routing::*;
41
42use crate::messages::{Message, NodeType};
43use snarkos_account::Account;
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_tcp::{Config, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip};
46
47use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
48
49use anyhow::{Result, bail};
50#[cfg(feature = "locktick")]
51use locktick::parking_lot::{Mutex, RwLock};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54#[cfg(not(any(test)))]
55use std::net::IpAddr;
56use std::{
57 collections::{HashMap, HashSet, hash_map::Entry},
58 future::Future,
59 net::SocketAddr,
60 ops::Deref,
61 str::FromStr,
62 sync::Arc,
63 time::Instant,
64};
65use tokio::task::JoinHandle;
66
67#[derive(Clone)]
68pub struct Router<N: Network>(Arc<InnerRouter<N>>);
69
70impl<N: Network> Deref for Router<N> {
71 type Target = Arc<InnerRouter<N>>;
72
73 fn deref(&self) -> &Self::Target {
74 &self.0
75 }
76}
77
78pub struct InnerRouter<N: Network> {
79 tcp: Tcp,
81 node_type: NodeType,
83 account: Account<N>,
85 ledger: Arc<dyn LedgerService<N>>,
87 cache: Cache<N>,
89 resolver: Resolver,
91 trusted_peers: HashSet<SocketAddr>,
93 connected_peers: RwLock<HashMap<SocketAddr, Peer<N>>>,
95 connecting_peers: Mutex<HashMap<SocketAddr, Option<Peer<N>>>>,
100 candidate_peers: RwLock<HashSet<SocketAddr>>,
102 restricted_peers: RwLock<HashMap<SocketAddr, Instant>>,
104 handles: Mutex<Vec<JoinHandle<()>>>,
106 rotate_external_peers: bool,
108 allow_external_peers: bool,
110 is_dev: bool,
112}
113
114impl<N: Network> Router<N> {
115 #[cfg(not(any(test)))]
117 const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
118 const MAXIMUM_CANDIDATE_PEERS: usize = 10_000;
120 const MAXIMUM_CONNECTION_FAILURES: usize = 5;
122 #[cfg(not(any(test)))]
124 const MAX_CONNECTION_ATTEMPTS: usize = 10;
125 const RADIO_SILENCE_IN_SECS: u64 = 150; }
129
130impl<N: Network> Router<N> {
131 #[allow(clippy::too_many_arguments)]
133 pub async fn new(
134 node_ip: SocketAddr,
135 node_type: NodeType,
136 account: Account<N>,
137 ledger: Arc<dyn LedgerService<N>>,
138 trusted_peers: &[SocketAddr],
139 max_peers: u16,
140 rotate_external_peers: bool,
141 allow_external_peers: bool,
142 is_dev: bool,
143 ) -> Result<Self> {
144 let tcp = Tcp::new(Config::new(node_ip, max_peers));
146 Ok(Self(Arc::new(InnerRouter {
148 tcp,
149 node_type,
150 account,
151 ledger,
152 cache: Default::default(),
153 resolver: Default::default(),
154 trusted_peers: trusted_peers.iter().copied().collect(),
155 connected_peers: Default::default(),
156 connecting_peers: Default::default(),
157 candidate_peers: Default::default(),
158 restricted_peers: Default::default(),
159 handles: Default::default(),
160 rotate_external_peers,
161 allow_external_peers,
162 is_dev,
163 })))
164 }
165}
166
167impl<N: Network> Router<N> {
168 pub fn connect(&self, peer_ip: SocketAddr) -> Option<JoinHandle<bool>> {
173 match self.check_connection_attempt(peer_ip) {
175 Ok(true) => return None,
176 Ok(false) => {}
177 Err(forbidden_message) => {
178 warn!("{forbidden_message}");
179 return None;
180 }
181 }
182
183 let router = self.clone();
184 Some(tokio::spawn(async move {
185 match router.tcp.connect(peer_ip).await {
187 Ok(()) => {
189 router.remove_candidate_peer(peer_ip);
190 true
191 }
192 Err(error) => {
194 router.connecting_peers.lock().remove(&peer_ip);
195 warn!("Unable to connect to '{peer_ip}' - {error}");
196 false
197 }
198 }
199 }))
200 }
201
202 fn check_connection_attempt(&self, peer_ip: SocketAddr) -> Result<bool> {
209 if self.is_local_ip(&peer_ip) {
211 bail!("Dropping connection attempt to '{peer_ip}' (attempted to self-connect)")
212 }
213 if self.number_of_connected_peers() >= self.max_connected_peers() {
215 bail!("Dropping connection attempt to '{peer_ip}' (maximum peers reached)")
216 }
217 if self.is_connected(&peer_ip) {
219 debug!("Dropping connection attempt to '{peer_ip}' (already connected)");
220 return Ok(true);
221 }
222 if self.is_restricted(&peer_ip) {
224 bail!("Dropping connection attempt to '{peer_ip}' (restricted)")
225 }
226 match self.connecting_peers.lock().entry(peer_ip) {
228 Entry::Vacant(entry) => {
229 entry.insert(None);
230 Ok(false)
231 }
232 Entry::Occupied(_) => {
233 debug!("Dropping connection attempt to '{peer_ip}' (already shaking hands as the initiator)");
234 Ok(true)
235 }
236 }
237 }
238
239 pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<bool> {
241 let router = self.clone();
242 tokio::spawn(async move {
243 if let Some(peer_addr) = router.resolve_to_ambiguous(&peer_ip) {
244 let disconnected = router.tcp.disconnect(peer_addr).await;
246 if router.is_connected(&peer_ip) && !router.tcp.is_connected(peer_addr) {
249 warn!("Disconnecting with fallback safety (report this to @ljedrz)");
250 router.remove_connected_peer(peer_ip);
251 }
252 disconnected
253 } else {
254 false
255 }
256 })
257 }
258
259 pub fn local_ip(&self) -> SocketAddr {
261 self.tcp.listening_addr().expect("The TCP listener is not enabled")
262 }
263
264 pub fn is_local_ip(&self, ip: &SocketAddr) -> bool {
266 *ip == self.local_ip()
267 || (ip.ip().is_unspecified() || ip.ip().is_loopback()) && ip.port() == self.local_ip().port()
268 }
269
270 pub fn is_valid_peer_ip(&self, ip: &SocketAddr) -> bool {
272 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
273 }
274
275 pub fn is_valid_message_version(&self, message_version: u32) -> bool {
277 let lowest_accepted_message_version = match self.node_type {
281 NodeType::Prover => Message::<N>::latest_message_version(),
283 NodeType::Validator | NodeType::Client => {
285 Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
286 }
287 };
288
289 message_version >= lowest_accepted_message_version
291 }
292
293 pub fn node_type(&self) -> NodeType {
295 self.node_type
296 }
297
298 pub fn private_key(&self) -> &PrivateKey<N> {
300 self.account.private_key()
301 }
302
303 pub fn view_key(&self) -> &ViewKey<N> {
305 self.account.view_key()
306 }
307
308 pub fn address(&self) -> Address<N> {
310 self.account.address()
311 }
312
313 pub fn is_dev(&self) -> bool {
315 self.is_dev
316 }
317
318 pub fn rotate_external_peers(&self) -> bool {
320 self.rotate_external_peers
321 }
322
323 pub fn allow_external_peers(&self) -> bool {
325 self.allow_external_peers
326 }
327
328 pub fn resolve_to_listener(&self, peer_addr: &SocketAddr) -> Option<SocketAddr> {
330 self.resolver.get_listener(peer_addr)
331 }
332
333 pub fn resolve_to_ambiguous(&self, peer_ip: &SocketAddr) -> Option<SocketAddr> {
335 self.resolver.get_ambiguous(peer_ip)
336 }
337
338 pub fn is_connected(&self, ip: &SocketAddr) -> bool {
340 self.connected_peers.read().contains_key(ip)
341 }
342
343 pub fn is_connected_validator(&self, peer_ip: &SocketAddr) -> bool {
345 self.connected_peers.read().get(peer_ip).map_or(false, |peer| peer.is_validator())
346 }
347
348 pub fn is_connected_prover(&self, peer_ip: &SocketAddr) -> bool {
350 self.connected_peers.read().get(peer_ip).map_or(false, |peer| peer.is_prover())
351 }
352
353 pub fn is_connected_client(&self, peer_ip: &SocketAddr) -> bool {
355 self.connected_peers.read().get(peer_ip).map_or(false, |peer| peer.is_client())
356 }
357
358 pub fn is_connecting(&self, ip: &SocketAddr) -> bool {
360 self.connecting_peers.lock().contains_key(ip)
361 }
362
363 pub fn is_restricted(&self, ip: &SocketAddr) -> bool {
365 self.restricted_peers
366 .read()
367 .get(ip)
368 .map(|time| time.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS)
369 .unwrap_or(false)
370 }
371
372 pub fn is_trusted(&self, ip: &SocketAddr) -> bool {
374 self.trusted_peers.contains(ip)
375 }
376
377 pub fn max_connected_peers(&self) -> usize {
379 self.tcp.config().max_connections as usize
380 }
381
382 pub fn number_of_connected_peers(&self) -> usize {
384 self.connected_peers.read().len()
385 }
386
387 pub fn number_of_connected_validators(&self) -> usize {
389 self.connected_peers.read().values().filter(|peer| peer.is_validator()).count()
390 }
391
392 pub fn number_of_connected_provers(&self) -> usize {
394 self.connected_peers.read().values().filter(|peer| peer.is_prover()).count()
395 }
396
397 pub fn number_of_connected_clients(&self) -> usize {
399 self.connected_peers.read().values().filter(|peer| peer.is_client()).count()
400 }
401
402 pub fn number_of_candidate_peers(&self) -> usize {
404 self.candidate_peers.read().len()
405 }
406
407 pub fn number_of_restricted_peers(&self) -> usize {
409 self.restricted_peers.read().len()
410 }
411
412 pub fn get_connected_peer(&self, ip: &SocketAddr) -> Option<Peer<N>> {
414 self.connected_peers.read().get(ip).cloned()
415 }
416
417 pub fn get_connected_peers(&self) -> Vec<Peer<N>> {
419 self.connected_peers.read().values().cloned().collect()
420 }
421
422 pub fn connected_peers(&self) -> Vec<SocketAddr> {
424 self.connected_peers.read().keys().copied().collect()
425 }
426
427 pub fn connected_validators(&self) -> Vec<SocketAddr> {
429 self.connected_peers.read().iter().filter(|(_, peer)| peer.is_validator()).map(|(ip, _)| *ip).collect()
430 }
431
432 pub fn connected_provers(&self) -> Vec<SocketAddr> {
434 self.connected_peers.read().iter().filter(|(_, peer)| peer.is_prover()).map(|(ip, _)| *ip).collect()
435 }
436
437 pub fn connected_clients(&self) -> Vec<SocketAddr> {
439 self.connected_peers.read().iter().filter(|(_, peer)| peer.is_client()).map(|(ip, _)| *ip).collect()
440 }
441
442 pub fn candidate_peers(&self) -> HashSet<SocketAddr> {
444 let banned_ips = self.tcp().banned_peers().get_banned_ips();
445 self.candidate_peers.read().iter().filter(|peer| !banned_ips.contains(&peer.ip())).copied().collect()
446 }
447
448 pub fn restricted_peers(&self) -> Vec<SocketAddr> {
450 self.restricted_peers.read().keys().copied().collect()
451 }
452
453 pub fn trusted_peers(&self) -> &HashSet<SocketAddr> {
455 &self.trusted_peers
456 }
457
458 #[allow(clippy::if_same_then_else)]
460 pub fn bootstrap_peers(&self) -> Vec<SocketAddr> {
461 if cfg!(feature = "test") || self.is_dev {
462 vec![]
464 } else if N::ID == snarkvm::console::network::MainnetV0::ID {
465 vec![
467 SocketAddr::from_str("35.231.67.219:4130").unwrap(),
468 SocketAddr::from_str("34.73.195.196:4130").unwrap(),
469 SocketAddr::from_str("34.23.225.202:4130").unwrap(),
470 SocketAddr::from_str("34.148.16.111:4130").unwrap(),
471 ]
472 } else if N::ID == snarkvm::console::network::TestnetV0::ID {
473 vec![
475 SocketAddr::from_str("34.138.104.159:4130").unwrap(),
476 SocketAddr::from_str("35.231.46.237:4130").unwrap(),
477 SocketAddr::from_str("34.148.251.155:4130").unwrap(),
478 SocketAddr::from_str("35.190.141.234:4130").unwrap(),
479 ]
480 } else if N::ID == snarkvm::console::network::CanaryV0::ID {
481 vec![
483 SocketAddr::from_str("34.139.88.58:4130").unwrap(),
484 SocketAddr::from_str("34.139.252.207:4130").unwrap(),
485 SocketAddr::from_str("35.185.98.12:4130").unwrap(),
486 SocketAddr::from_str("35.231.106.26:4130").unwrap(),
487 ]
488 } else {
489 vec![]
491 }
492 }
493
494 #[cfg(not(any(test)))]
496 fn is_ip_banned(&self, ip: IpAddr) -> bool {
497 self.tcp.banned_peers().is_ip_banned(&ip)
498 }
499
500 #[cfg(not(any(test)))]
502 fn update_ip_ban(&self, ip: IpAddr) {
503 self.tcp.banned_peers().update_ip_ban(ip);
504 }
505
506 pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
508 self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect()
509 }
510
511 #[cfg(feature = "metrics")]
512 fn update_metrics(&self) {
513 metrics::gauge(metrics::router::CONNECTED, self.connected_peers.read().len() as f64);
514 metrics::gauge(metrics::router::CANDIDATE, self.candidate_peers.read().len() as f64);
515 metrics::gauge(metrics::router::RESTRICTED, self.restricted_peers.read().len() as f64);
516 }
517
518 pub fn insert_connected_peer(&self, peer_ip: SocketAddr) {
520 let peer = match self.connecting_peers.lock().remove(&peer_ip) {
522 Some(Some(peer)) => peer,
523 Some(None) => {
524 warn!("Couldn't promote {peer_ip} from \"connecting\" to \"connected\": Handshake not completed");
525 return;
526 }
527 None => {
528 warn!("Couldn't promote {peer_ip} from \"connecting\" to \"connected\": Public/listen address unknown");
529 return;
530 }
531 };
532 self.connected_peers.write().insert(peer_ip, peer);
534 self.candidate_peers.write().remove(&peer_ip);
536 self.restricted_peers.write().remove(&peer_ip);
538 #[cfg(feature = "metrics")]
539 self.update_metrics();
540 info!("Connected to '{peer_ip}'");
541 }
542
543 pub fn insert_candidate_peers(&self, peers: &[SocketAddr]) {
548 let max_candidate_peers = Self::MAXIMUM_CANDIDATE_PEERS.saturating_sub(self.number_of_candidate_peers());
550 let eligible_peers = peers
552 .iter()
553 .filter(|peer_ip| {
554 !self.is_local_ip(peer_ip) && !self.is_connected(peer_ip) && !self.is_restricted(peer_ip)
556 })
557 .take(max_candidate_peers);
558
559 self.candidate_peers.write().extend(eligible_peers);
561 #[cfg(feature = "metrics")]
562 self.update_metrics();
563 }
564
565 pub fn insert_restricted_peer(&self, peer_ip: SocketAddr) {
567 self.candidate_peers.write().remove(&peer_ip);
569 self.restricted_peers.write().insert(peer_ip, Instant::now());
571 #[cfg(feature = "metrics")]
572 self.update_metrics();
573 }
574
575 pub fn update_connected_peer<Fn: FnMut(&mut Peer<N>)>(
577 &self,
578 peer_ip: SocketAddr,
579 node_type: NodeType,
580 mut write_fn: Fn,
581 ) -> Result<()> {
582 if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) {
584 if peer.node_type() != node_type {
586 bail!("Peer '{peer_ip}' has changed node types from {} to {node_type}", peer.node_type())
587 }
588 write_fn(peer);
590 }
591 Ok(())
592 }
593
594 pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
595 if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) {
596 peer.set_last_seen(Instant::now());
597 }
598 }
599
600 pub fn remove_connected_peer(&self, peer_ip: SocketAddr) {
602 self.resolver.remove_peer(&peer_ip);
604 self.connected_peers.write().remove(&peer_ip);
606 self.candidate_peers.write().insert(peer_ip);
608 self.cache.clear_peer_entries(peer_ip);
610 #[cfg(feature = "metrics")]
611 self.update_metrics();
612 }
613
614 #[cfg(feature = "test")]
615 pub fn clear_candidate_peers(&self) {
616 self.candidate_peers.write().clear();
617 #[cfg(feature = "metrics")]
618 self.update_metrics();
619 }
620
621 pub fn remove_candidate_peer(&self, peer_ip: SocketAddr) {
623 self.candidate_peers.write().remove(&peer_ip);
624 #[cfg(feature = "metrics")]
625 self.update_metrics();
626 }
627
628 pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
630 self.handles.lock().push(tokio::spawn(future));
631 }
632
633 pub async fn shut_down(&self) {
635 info!("Shutting down the router...");
636 self.handles.lock().iter().for_each(|handle| handle.abort());
638 self.tcp.shut_down().await;
640 }
641}