Skip to main content

snarkos_node_router/
lib.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27use snarkos_utilities::NodeDataDir;
28
29mod handshake;
30
31mod heartbeat;
32pub use heartbeat::*;
33
34mod helpers;
35pub use helpers::*;
36
37mod inbound;
38pub use inbound::*;
39
40mod outbound;
41pub use outbound::*;
42
43mod routing;
44pub use routing::*;
45
46mod writing;
47
48use crate::messages::{BlockRequest, Message, MessageCodec};
49
50use snarkos_account::Account;
51use snarkos_node_bft_ledger_service::LedgerService;
52use snarkos_node_network::{
53    CandidatePeer,
54    ConnectedPeer,
55    ConnectionMode,
56    NodeType,
57    Peer,
58    PeerPoolHandling,
59    Resolver,
60    bootstrap_peers,
61};
62use snarkos_node_sync_communication_service::CommunicationService;
63use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
64
65use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
66
67use anyhow::Result;
68#[cfg(feature = "locktick")]
69use locktick::parking_lot::{Mutex, RwLock};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc};
73use tokio::task::JoinHandle;
74
75/// The default port used by the router.
76pub const DEFAULT_NODE_PORT: u16 = 4130;
77
78/// The router keeps track of connected and connecting peers.
79/// The actual network communication happens in Inbound/Outbound,
80/// which is implemented by Validator, Prover, and Client.
81#[derive(Clone)]
82pub struct Router<N: Network>(Arc<InnerRouter<N>>);
83
84impl<N: Network> Deref for Router<N> {
85    type Target = Arc<InnerRouter<N>>;
86
87    fn deref(&self) -> &Self::Target {
88        &self.0
89    }
90}
91
92impl<N: Network> PeerPoolHandling<N> for Router<N> {
93    const MAXIMUM_POOL_SIZE: usize = 10_000;
94    const OWNER: &str = "[Router]";
95    const PEER_SLASHING_COUNT: usize = 200;
96
97    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
98        &self.peer_pool
99    }
100
101    fn resolver(&self) -> &RwLock<Resolver<N>> {
102        &self.resolver
103    }
104
105    fn is_dev(&self) -> bool {
106        self.is_dev
107    }
108
109    fn trusted_peers_only(&self) -> bool {
110        self.trusted_peers_only
111    }
112
113    fn node_type(&self) -> NodeType {
114        self.node_type
115    }
116}
117
118pub struct InnerRouter<N: Network> {
119    /// The TCP stack.
120    tcp: Tcp,
121    /// The node type.
122    node_type: NodeType,
123    /// The account of the node.
124    account: Account<N>,
125    /// The ledger service.
126    ledger: Arc<dyn LedgerService<N>>,
127    /// The cache.
128    cache: Cache<N>,
129    /// The resolver.
130    resolver: RwLock<Resolver<N>>,
131    /// The collection of both candidate and connected peers.
132    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
133    /// The spawned handles.
134    handles: Mutex<Vec<JoinHandle<()>>>,
135    /// If the flag is set, the node will only connect to trusted peers.
136    trusted_peers_only: bool,
137    /// The storage mode.
138    node_data_dir: NodeDataDir,
139    /// The boolean flag for the development mode.
140    is_dev: bool,
141}
142
143impl<N: Network> Router<N> {
144    /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
145    #[cfg(not(feature = "test"))]
146    const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
147    /// The maximum amount of connection attempts within a 10 second threshold.
148    #[cfg(not(feature = "test"))]
149    const MAX_CONNECTION_ATTEMPTS: usize = 10;
150}
151
152impl<N: Network> Router<N> {
153    /// Initializes a new `Router` instance.
154    #[allow(clippy::too_many_arguments)]
155    pub async fn new(
156        node_ip: SocketAddr,
157        node_type: NodeType,
158        account: Account<N>,
159        ledger: Arc<dyn LedgerService<N>>,
160        trusted_peers: &[SocketAddr],
161        max_peers: u16,
162        trusted_peers_only: bool,
163        node_data_dir: NodeDataDir,
164        is_dev: bool,
165    ) -> Result<Self> {
166        // Initialize the TCP stack.
167        let tcp = Tcp::new(Config::new(node_ip, max_peers));
168
169        // Prepare the collection of the initial peers.
170        let mut initial_peers = HashMap::new();
171
172        // Load entries from the peer cache (if present and if we are not in trusted peers only mode).
173        if !trusted_peers_only {
174            let cached_peers = Self::load_cached_peers(&node_data_dir.router_peer_cache_path())?;
175            for addr in cached_peers {
176                initial_peers.insert(addr, Peer::new_candidate(addr, false));
177            }
178        }
179
180        // Add the trusted peers to the list of the initial peers; this may promote
181        // some of the cached peers to trusted ones.
182        initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
183
184        // Initialize the router.
185        Ok(Self(Arc::new(InnerRouter {
186            tcp,
187            node_type,
188            account,
189            ledger,
190            cache: Default::default(),
191            resolver: Default::default(),
192            peer_pool: RwLock::new(initial_peers),
193            handles: Default::default(),
194            trusted_peers_only,
195            node_data_dir,
196            is_dev,
197        })))
198    }
199}
200
201impl<N: Network> Router<N> {
202    /// Returns `true` if the message version is valid.
203    pub fn is_valid_message_version(&self, message_version: u32) -> bool {
204        // Determine the minimum message version this node will accept, based on its role.
205        // - Provers always operate at the latest message version.
206        // - Validators and clients may accept older versions, depending on their current block height.
207        let lowest_accepted_message_version = match self.node_type {
208            // Provers should always use the latest version. The bootstrap clients are forced to
209            // be strict, as they don't follow the current chain height.
210            NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
211            // Validators and clients accept messages from lower version based on the migration height.
212            NodeType::Validator | NodeType::Client => {
213                Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
214            }
215        };
216
217        // Check if the incoming message version is valid.
218        message_version >= lowest_accepted_message_version
219    }
220
221    /// Returns the account private key of the node.
222    pub fn private_key(&self) -> &PrivateKey<N> {
223        self.account.private_key()
224    }
225
226    /// Returns the account view key of the node.
227    pub fn view_key(&self) -> &ViewKey<N> {
228        self.account.view_key()
229    }
230
231    /// Returns the account address of the node.
232    pub fn address(&self) -> Address<N> {
233        self.account.address()
234    }
235
236    /// Returns a reference to the cache.
237    pub fn cache(&self) -> &Cache<N> {
238        &self.cache
239    }
240
241    /// Returns a reference to the ledger.
242    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
243        &self.ledger
244    }
245
246    /// Returns `true` if the node is only engaging with trusted peers.
247    pub fn trusted_peers_only(&self) -> bool {
248        self.trusted_peers_only
249    }
250
251    /// Returns the listener IP address from the (ambiguous) peer address.
252    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
253        self.resolver.read().get_listener(connected_addr)
254    }
255
256    /// Returns the list of metrics for the connected peers.
257    pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
258        self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
259    }
260
261    #[cfg(feature = "metrics")]
262    pub fn update_metrics(&self) {
263        metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
264        metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
265    }
266
267    pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
268        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
269            peer.update_last_seen();
270        }
271    }
272
273    /// Spawns a task with the given future; it should only be used for long-running tasks.
274    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
275        self.handles.lock().push(tokio::spawn(future));
276    }
277
278    /// Shuts down the router.
279    pub async fn shut_down(&self) {
280        info!("Shutting down the router...");
281        // Save the best peers for future use.
282        if let Err(e) =
283            self.save_best_peers(&self.node_data_dir.router_peer_cache_path(), Some(MAX_PEERS_TO_SEND), true)
284        {
285            warn!("Failed to persist best peers to disk: {e}");
286        }
287        // Abort the tasks.
288        self.handles.lock().iter().for_each(|handle| handle.abort());
289        // Close the listener.
290        self.tcp.shut_down().await;
291    }
292}
293
294#[async_trait]
295impl<N: Network> CommunicationService for Router<N> {
296    /// The message type.
297    type Message = Message<N>;
298
299    /// Prepares a block request to be sent.
300    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
301        debug_assert!(start_height < end_height, "Invalid block request format");
302        Message::BlockRequest(BlockRequest { start_height, end_height })
303    }
304
305    /// Sends the given message to specified peer.
306    ///
307    /// This function returns as soon as the message is queued to be sent,
308    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
309    /// which can be used to determine when and whether the message has been delivered.
310    async fn send(
311        &self,
312        peer_ip: SocketAddr,
313        message: Self::Message,
314    ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
315        self.send(peer_ip, message)
316    }
317}