Skip to main content

snarkos_node_router/
lib.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27use snarkos_utilities::NodeDataDir;
28
29mod handshake;
30
31mod heartbeat;
32pub use heartbeat::*;
33
34mod helpers;
35pub use helpers::*;
36
37mod inbound;
38pub use inbound::*;
39
40mod outbound;
41pub use outbound::*;
42
43mod routing;
44pub use routing::*;
45
46mod writing;
47
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_network::{
53    CandidatePeer,
54    ConnectedPeer,
55    ConnectionMode,
56    NodeType,
57    Peer,
58    PeerPoolHandling,
59    Resolver,
60    bootstrap_peers,
61};
62use snarkos_node_sync_communication_service::CommunicationService;
63use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
64
65use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
66
67use anyhow::Result;
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
73use tokio::task::JoinHandle;
74
75/// The default port used by the router.
76pub const DEFAULT_NODE_PORT: u16 = 4130;
77
78/// The router keeps track of connected and connecting peers.
79/// The actual network communication happens in Inbound/Outbound,
80/// which is implemented by Validator, Prover, and Client.
81#[derive(Clone)]
82pub struct Router<N: Network>(Arc<InnerRouter<N>>);
83
84impl<N: Network> Deref for Router<N> {
85    type Target = Arc<InnerRouter<N>>;
86
87    fn deref(&self) -> &Self::Target {
88        &self.0
89    }
90}
91
92impl<N: Network> PeerPoolHandling<N> for Router<N> {
93    const MAXIMUM_POOL_SIZE: usize = 10_000;
94    const OWNER: &str = "[Router]";
95    const PEER_SLASHING_COUNT: usize = 200;
96
97    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
98        &self.peer_pool
99    }
100
101    fn resolver(&self) -> &RwLock<Resolver<N>> {
102        &self.resolver
103    }
104
105    fn is_dev(&self) -> bool {
106        self.is_dev
107    }
108
109    fn trusted_peers_only(&self) -> bool {
110        self.trusted_peers_only
111    }
112
113    fn node_type(&self) -> NodeType {
114        self.node_type
115    }
116}
117
118pub struct InnerRouter<N: Network> {
119    /// The TCP stack.
120    tcp: Tcp,
121    /// The node type.
122    node_type: NodeType,
123    /// The account of the node.
124    account: Account<N>,
125    /// The ledger service.
126    ledger: Arc<dyn LedgerService<N>>,
127    /// The cache.
128    cache: Cache<N>,
129    /// The resolver.
130    resolver: RwLock<Resolver<N>>,
131    /// The collection of both candidate and connected peers.
132    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
133    /// The spawned handles.
134    handles: Mutex<Vec<JoinHandle<()>>>,
135    /// If the flag is set, the node will only connect to trusted peers.
136    trusted_peers_only: bool,
137    /// The storage mode.
138    node_data_dir: NodeDataDir,
139    /// The boolean flag for the development mode.
140    is_dev: bool,
141}
142
143impl<N: Network> Router<N> {
144    /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
145    #[cfg(not(feature = "test"))]
146    const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
147    /// The maximum amount of connection attempts within a 10 second threshold.
148    #[cfg(not(feature = "test"))]
149    const MAX_CONNECTION_ATTEMPTS: usize = 10;
150    /// The duration after which a connected peer is considered inactive or
151    /// disconnected if no message has been received in the meantime.
152    const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); // 2.5 minutes
153}
154
155impl<N: Network> Router<N> {
156    /// Initializes a new `Router` instance.
157    #[allow(clippy::too_many_arguments)]
158    pub async fn new(
159        node_ip: SocketAddr,
160        node_type: NodeType,
161        account: Account<N>,
162        ledger: Arc<dyn LedgerService<N>>,
163        trusted_peers: &[SocketAddr],
164        max_peers: u16,
165        trusted_peers_only: bool,
166        node_data_dir: NodeDataDir,
167        is_dev: bool,
168    ) -> Result<Self> {
169        // Initialize the TCP stack.
170        let tcp = Tcp::new(Config::new(node_ip, max_peers));
171
172        // Prepare the collection of the initial peers.
173        let mut initial_peers = HashMap::new();
174
175        // Load entries from the peer cache (if present and if we are not in trusted peers only mode).
176        if !trusted_peers_only {
177            let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
178            for addr in cached_peers {
179                initial_peers.insert(addr, Peer::new_candidate(addr, false));
180            }
181        }
182
183        // Add the trusted peers to the list of the initial peers; this may promote
184        // some of the cached peers to trusted ones.
185        initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
186
187        // Initialize the router.
188        Ok(Self(Arc::new(InnerRouter {
189            tcp,
190            node_type,
191            account,
192            ledger,
193            cache: Default::default(),
194            resolver: Default::default(),
195            peer_pool: RwLock::new(initial_peers),
196            handles: Default::default(),
197            trusted_peers_only,
198            node_data_dir,
199            is_dev,
200        })))
201    }
202}
203
204impl<N: Network> Router<N> {
205    /// Returns `true` if the message version is valid.
206    pub fn is_valid_message_version(&self, message_version: u32) -> bool {
207        // Determine the minimum message version this node will accept, based on its role.
208        // - Provers always operate at the latest message version.
209        // - Validators and clients may accept older versions, depending on their current block height.
210        let lowest_accepted_message_version = match self.node_type {
211            // Provers should always use the latest version. The bootstrap clients are forced to
212            // be strict, as they don't follow the current chain height.
213            NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
214            // Validators and clients accept messages from lower version based on the migration height.
215            NodeType::Validator | NodeType::Client => {
216                Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
217            }
218        };
219
220        // Check if the incoming message version is valid.
221        message_version >= lowest_accepted_message_version
222    }
223
224    /// Returns the account private key of the node.
225    pub fn private_key(&self) -> &PrivateKey<N> {
226        self.account.private_key()
227    }
228
229    /// Returns the account view key of the node.
230    pub fn view_key(&self) -> &ViewKey<N> {
231        self.account.view_key()
232    }
233
234    /// Returns the account address of the node.
235    pub fn address(&self) -> Address<N> {
236        self.account.address()
237    }
238
239    /// Returns a reference to the cache.
240    pub fn cache(&self) -> &Cache<N> {
241        &self.cache
242    }
243
244    /// Returns `true` if the node is only engaging with trusted peers.
245    pub fn trusted_peers_only(&self) -> bool {
246        self.trusted_peers_only
247    }
248
249    /// Returns the listener IP address from the (ambiguous) peer address.
250    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
251        self.resolver.read().get_listener(connected_addr)
252    }
253
254    /// Returns the list of metrics for the connected peers.
255    pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
256        self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
257    }
258
259    #[cfg(feature = "metrics")]
260    pub fn update_metrics(&self) {
261        metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
262        metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
263    }
264
265    pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
266        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
267            peer.update_last_seen();
268        }
269    }
270
271    /// Spawns a task with the given future; it should only be used for long-running tasks.
272    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
273        self.handles.lock().push(tokio::spawn(future));
274    }
275
276    /// Shuts down the router.
277    pub async fn shut_down(&self) {
278        info!("Shutting down the router...");
279        // Save the best peers for future use.
280        if let Err(e) =
281            self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
282        {
283            warn!("Failed to persist best peers to disk: {e}");
284        }
285        // Abort the tasks.
286        self.handles.lock().iter().for_each(|handle| handle.abort());
287        // Close the listener.
288        self.tcp.shut_down().await;
289    }
290}
291
292#[async_trait]
293impl<N: Network> CommunicationService for Router<N> {
294    /// The message type.
295    type Message = Message<N>;
296
297    /// Prepares a block request to be sent.
298    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
299        debug_assert!(start_height < end_height, "Invalid block request format");
300        Message::BlockRequest(BlockRequest { start_height, end_height })
301    }
302
303    /// Sends the given message to specified peer.
304    ///
305    /// This function returns as soon as the message is queued to be sent,
306    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
307    /// which can be used to determine when and whether the message has been delivered.
308    async fn send(
309        &self,
310        peer_ip: SocketAddr,
311        message: Self::Message,
312    ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
313        self.send(peer_ip, message)
314    }
315}