Skip to main content

snarkos_node_router/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27use snarkos_utilities::NodeDataDir;
28
29mod handshake;
30
31mod heartbeat;
32pub use heartbeat::*;
33
34mod helpers;
35pub use helpers::*;
36
37mod inbound;
38pub use inbound::*;
39
40mod outbound;
41pub use outbound::*;
42
43mod routing;
44pub use routing::*;
45
46mod writing;
47
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_network::{
53    ConnectedPeer,
54    ConnectionMode,
55    NodeType,
56    Peer,
57    PeerPoolHandling,
58    Resolver,
59    bootstrap_peers,
60};
61use snarkos_node_sync_communication_service::CommunicationService;
62use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
63
64use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
65
66use anyhow::Result;
67#[cfg(feature = "locktick")]
68use locktick::parking_lot::{Mutex, RwLock};
69#[cfg(not(feature = "locktick"))]
70use parking_lot::{Mutex, RwLock};
71use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
72use tokio::task::JoinHandle;
73
74/// The default port used by the router.
75pub const DEFAULT_NODE_PORT: u16 = 4130;
76
77/// The router keeps track of connected and connecting peers.
78/// The actual network communication happens in Inbound/Outbound,
79/// which is implemented by Validator, Prover, and Client.
80#[derive(Clone)]
81pub struct Router<N: Network>(Arc<InnerRouter<N>>);
82
83impl<N: Network> Deref for Router<N> {
84    type Target = Arc<InnerRouter<N>>;
85
86    fn deref(&self) -> &Self::Target {
87        &self.0
88    }
89}
90
91impl<N: Network> PeerPoolHandling<N> for Router<N> {
92    const MAXIMUM_POOL_SIZE: usize = 10_000;
93    const OWNER: &str = "[Router]";
94    const PEER_SLASHING_COUNT: usize = 200;
95
96    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
97        &self.peer_pool
98    }
99
100    fn resolver(&self) -> &RwLock<Resolver<N>> {
101        &self.resolver
102    }
103
104    fn is_dev(&self) -> bool {
105        self.is_dev
106    }
107
108    fn trusted_peers_only(&self) -> bool {
109        self.trusted_peers_only
110    }
111
112    fn node_type(&self) -> NodeType {
113        self.node_type
114    }
115}
116
117pub struct InnerRouter<N: Network> {
118    /// The TCP stack.
119    tcp: Tcp,
120    /// The node type.
121    node_type: NodeType,
122    /// The account of the node.
123    account: Account<N>,
124    /// The ledger service.
125    ledger: Arc<dyn LedgerService<N>>,
126    /// The cache.
127    cache: Cache<N>,
128    /// The resolver.
129    resolver: RwLock<Resolver<N>>,
130    /// The collection of both candidate and connected peers.
131    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
132    /// The spawned handles.
133    handles: Mutex<Vec<JoinHandle<()>>>,
134    /// If the flag is set, the node will only connect to trusted peers.
135    trusted_peers_only: bool,
136    /// The storage mode.
137    node_data_dir: NodeDataDir,
138    /// The boolean flag for the development mode.
139    is_dev: bool,
140}
141
142impl<N: Network> Router<N> {
143    /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
144    #[cfg(not(feature = "test"))]
145    const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
146    /// The maximum amount of connection attempts within a 10 second threshold.
147    #[cfg(not(feature = "test"))]
148    const MAX_CONNECTION_ATTEMPTS: usize = 10;
149    /// The duration after which a connected peer is considered inactive or
150    /// disconnected if no message has been received in the meantime.
151    const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); // 2.5 minutes
152}
153
154impl<N: Network> Router<N> {
155    /// Initializes a new `Router` instance.
156    #[allow(clippy::too_many_arguments)]
157    pub async fn new(
158        node_ip: SocketAddr,
159        node_type: NodeType,
160        account: Account<N>,
161        ledger: Arc<dyn LedgerService<N>>,
162        trusted_peers: &[SocketAddr],
163        max_peers: u16,
164        trusted_peers_only: bool,
165        node_data_dir: NodeDataDir,
166        is_dev: bool,
167    ) -> Result<Self> {
168        // Initialize the TCP stack.
169        let tcp = Tcp::new(Config::new(node_ip, max_peers));
170
171        // Prepare the collection of the initial peers.
172        let mut initial_peers = HashMap::new();
173
174        // Load entries from the peer cache (if present and if we are not in trusted peers only mode).
175        if !trusted_peers_only {
176            let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
177            for addr in cached_peers {
178                initial_peers.insert(addr, Peer::new_candidate(addr, false));
179            }
180        }
181
182        // Add the trusted peers to the list of the initial peers; this may promote
183        // some of the cached peers to trusted ones.
184        initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
185
186        // Initialize the router.
187        Ok(Self(Arc::new(InnerRouter {
188            tcp,
189            node_type,
190            account,
191            ledger,
192            cache: Default::default(),
193            resolver: Default::default(),
194            peer_pool: RwLock::new(initial_peers),
195            handles: Default::default(),
196            trusted_peers_only,
197            node_data_dir,
198            is_dev,
199        })))
200    }
201}
202
203impl<N: Network> Router<N> {
204    /// Returns `true` if the message version is valid.
205    pub fn is_valid_message_version(&self, message_version: u32) -> bool {
206        // Determine the minimum message version this node will accept, based on its role.
207        // - Provers always operate at the latest message version.
208        // - Validators and clients may accept older versions, depending on their current block height.
209        let lowest_accepted_message_version = match self.node_type {
210            // Provers should always use the latest version. The bootstrap clients are forced to
211            // be strict, as they don't follow the current chain height.
212            NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
213            // Validators and clients accept messages from lower version based on the migration height.
214            NodeType::Validator | NodeType::Client => {
215                Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
216            }
217        };
218
219        // Check if the incoming message version is valid.
220        message_version >= lowest_accepted_message_version
221    }
222
223    /// Returns the account private key of the node.
224    pub fn private_key(&self) -> &PrivateKey<N> {
225        self.account.private_key()
226    }
227
228    /// Returns the account view key of the node.
229    pub fn view_key(&self) -> &ViewKey<N> {
230        self.account.view_key()
231    }
232
233    /// Returns the account address of the node.
234    pub fn address(&self) -> Address<N> {
235        self.account.address()
236    }
237
238    /// Returns a reference to the cache.
239    pub fn cache(&self) -> &Cache<N> {
240        &self.cache
241    }
242
243    /// Returns `true` if the node is only engaging with trusted peers.
244    pub fn trusted_peers_only(&self) -> bool {
245        self.trusted_peers_only
246    }
247
248    /// Returns the listener IP address from the (ambiguous) peer address.
249    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
250        self.resolver.read().get_listener(connected_addr)
251    }
252
253    /// Returns the list of metrics for the connected peers.
254    pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
255        self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
256    }
257
258    #[cfg(feature = "metrics")]
259    pub fn update_metrics(&self) {
260        metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
261        metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
262    }
263
264    pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
265        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
266            peer.update_last_seen();
267        }
268    }
269
270    /// Spawns a task with the given future; it should only be used for long-running tasks.
271    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
272        self.handles.lock().push(tokio::spawn(future));
273    }
274
275    /// Shuts down the router.
276    pub async fn shut_down(&self) {
277        info!("Shutting down the router...");
278        // Save the best peers for future use.
279        if let Err(e) =
280            self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
281        {
282            warn!("Failed to persist best peers to disk: {e}");
283        }
284        // Abort the tasks.
285        self.handles.lock().iter().for_each(|handle| handle.abort());
286        // Close the listener.
287        self.tcp.shut_down().await;
288    }
289}
290
291#[async_trait]
292impl<N: Network> CommunicationService for Router<N> {
293    /// The message type.
294    type Message = Message<N>;
295
296    /// Prepares a block request to be sent.
297    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
298        debug_assert!(start_height < end_height, "Invalid block request format");
299        Message::BlockRequest(BlockRequest { start_height, end_height })
300    }
301
302    /// Sends the given message to specified peer.
303    ///
304    /// This function returns as soon as the message is queued to be sent,
305    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
306    /// which can be used to determine when and whether the message has been delivered.
307    async fn send(
308        &self,
309        peer_ip: SocketAddr,
310        message: Self::Message,
311    ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
312        self.send(peer_ip, message)
313    }
314}