snarkos_node_router/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate async_trait;
20#[macro_use]
21extern crate tracing;
22
23#[cfg(feature = "metrics")]
24extern crate snarkos_node_metrics as metrics;
25
26pub use snarkos_node_router_messages as messages;
27
28mod handshake;
29
30mod heartbeat;
31pub use heartbeat::*;
32
33mod helpers;
34pub use helpers::*;
35
36mod inbound;
37pub use inbound::*;
38
39mod outbound;
40pub use outbound::*;
41
42mod routing;
43pub use routing::*;
44
45mod writing;
46
47use crate::messages::{BlockRequest, Message, MessageCodec};
48
49use snarkos_account::Account;
50use snarkos_node_bft_ledger_service::LedgerService;
51use snarkos_node_network::{
52    ConnectedPeer,
53    ConnectionMode,
54    NodeType,
55    Peer,
56    PeerPoolHandling,
57    Resolver,
58    bootstrap_peers,
59};
60use snarkos_node_sync_communication_service::CommunicationService;
61use snarkos_node_tcp::{Config, ConnectionSide, Tcp};
62
63use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};
64
65use aleo_std::StorageMode;
66use anyhow::Result;
67#[cfg(feature = "locktick")]
68use locktick::parking_lot::{Mutex, RwLock};
69#[cfg(not(feature = "locktick"))]
70use parking_lot::{Mutex, RwLock};
71use std::{collections::HashMap, future::Future, io, net::SocketAddr, ops::Deref, sync::Arc, time::Duration};
72use tokio::task::JoinHandle;
73
74/// The default port used by the router.
75pub const DEFAULT_NODE_PORT: u16 = 4130;
76
77/// The name of the file containing cached peers.
78const PEER_CACHE_FILENAME: &str = "cached_router_peers";
79
80/// The router keeps track of connected and connecting peers.
81/// The actual network communication happens in Inbound/Outbound,
82/// which is implemented by Validator, Prover, and Client.
83#[derive(Clone)]
84pub struct Router<N: Network>(Arc<InnerRouter<N>>);
85
86impl<N: Network> Deref for Router<N> {
87    type Target = Arc<InnerRouter<N>>;
88
89    fn deref(&self) -> &Self::Target {
90        &self.0
91    }
92}
93
94impl<N: Network> PeerPoolHandling<N> for Router<N> {
95    const MAXIMUM_POOL_SIZE: usize = 10_000;
96    const OWNER: &str = "[Router]";
97    const PEER_SLASHING_COUNT: usize = 200;
98
99    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
100        &self.peer_pool
101    }
102
103    fn resolver(&self) -> &RwLock<Resolver<N>> {
104        &self.resolver
105    }
106
107    fn is_dev(&self) -> bool {
108        self.is_dev
109    }
110
111    fn trusted_peers_only(&self) -> bool {
112        self.trusted_peers_only
113    }
114
115    fn node_type(&self) -> NodeType {
116        self.node_type
117    }
118}
119
120pub struct InnerRouter<N: Network> {
121    /// The TCP stack.
122    tcp: Tcp,
123    /// The node type.
124    node_type: NodeType,
125    /// The account of the node.
126    account: Account<N>,
127    /// The ledger service.
128    ledger: Arc<dyn LedgerService<N>>,
129    /// The cache.
130    cache: Cache<N>,
131    /// The resolver.
132    resolver: RwLock<Resolver<N>>,
133    /// The collection of both candidate and connected peers.
134    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
135    /// The spawned handles.
136    handles: Mutex<Vec<JoinHandle<()>>>,
137    /// If the flag is set, the node will only connect to trusted peers.
138    trusted_peers_only: bool,
139    /// The storage mode.
140    storage_mode: StorageMode,
141    /// The boolean flag for the development mode.
142    is_dev: bool,
143}
144
145impl<N: Network> Router<N> {
146    /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
147    #[cfg(not(feature = "test"))]
148    const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
149    /// The maximum amount of connection attempts within a 10 second threshold
150    #[cfg(not(feature = "test"))]
151    const MAX_CONNECTION_ATTEMPTS: usize = 10;
152    /// The duration after which a connected peer is considered inactive or
153    /// disconnected if no message has been received in the meantime.
154    const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); // 2.5 minutes
155}
156
157impl<N: Network> Router<N> {
158    /// Initializes a new `Router` instance.
159    #[allow(clippy::too_many_arguments)]
160    pub async fn new(
161        node_ip: SocketAddr,
162        node_type: NodeType,
163        account: Account<N>,
164        ledger: Arc<dyn LedgerService<N>>,
165        trusted_peers: &[SocketAddr],
166        max_peers: u16,
167        trusted_peers_only: bool,
168        storage_mode: StorageMode,
169        is_dev: bool,
170    ) -> Result<Self> {
171        // Initialize the TCP stack.
172        let tcp = Tcp::new(Config::new(node_ip, max_peers));
173
174        // Prepare the collection of the initial peers.
175        let mut initial_peers = HashMap::new();
176
177        // Load entries from the peer cache (if present and if we are not in trusted peers only mode).
178        if !trusted_peers_only {
179            let cached_peers = Self::load_cached_peers(&storage_mode, PEER_CACHE_FILENAME)?;
180            for addr in cached_peers {
181                initial_peers.insert(addr, Peer::new_candidate(addr, false));
182            }
183        }
184
185        // Add the trusted peers to the list of the initial peers; this may promote
186        // some of the cached peers to trusted ones.
187        initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
188
189        // Initialize the router.
190        Ok(Self(Arc::new(InnerRouter {
191            tcp,
192            node_type,
193            account,
194            ledger,
195            cache: Default::default(),
196            resolver: Default::default(),
197            peer_pool: RwLock::new(initial_peers),
198            handles: Default::default(),
199            trusted_peers_only,
200            storage_mode,
201            is_dev,
202        })))
203    }
204}
205
206impl<N: Network> Router<N> {
207    /// Returns `true` if the message version is valid.
208    pub fn is_valid_message_version(&self, message_version: u32) -> bool {
209        // Determine the minimum message version this node will accept, based on its role.
210        // - Provers always operate at the latest message version.
211        // - Validators and clients may accept older versions, depending on their current block height.
212        let lowest_accepted_message_version = match self.node_type {
213            // Provers should always use the latest version. The bootstrap clients are forced to
214            // be strict, as they don't follow the current chain height.
215            NodeType::Prover | NodeType::BootstrapClient => Message::<N>::latest_message_version(),
216            // Validators and clients accept messages from lower version based on the migration height.
217            NodeType::Validator | NodeType::Client => {
218                Message::<N>::lowest_accepted_message_version(self.ledger.latest_block_height())
219            }
220        };
221
222        // Check if the incoming message version is valid.
223        message_version >= lowest_accepted_message_version
224    }
225
226    /// Returns the account private key of the node.
227    pub fn private_key(&self) -> &PrivateKey<N> {
228        self.account.private_key()
229    }
230
231    /// Returns the account view key of the node.
232    pub fn view_key(&self) -> &ViewKey<N> {
233        self.account.view_key()
234    }
235
236    /// Returns the account address of the node.
237    pub fn address(&self) -> Address<N> {
238        self.account.address()
239    }
240
241    /// Returns a reference to the cache.
242    pub fn cache(&self) -> &Cache<N> {
243        &self.cache
244    }
245
246    /// Returns `true` if the node is only engaging with trusted peers.
247    pub fn trusted_peers_only(&self) -> bool {
248        self.trusted_peers_only
249    }
250
251    /// Returns the listener IP address from the (ambiguous) peer address.
252    pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option<SocketAddr> {
253        self.resolver.read().get_listener(connected_addr)
254    }
255
256    /// Returns the list of metrics for the connected peers.
257    pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> {
258        self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect()
259    }
260
261    #[cfg(feature = "metrics")]
262    pub fn update_metrics(&self) {
263        metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64);
264        metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64);
265    }
266
267    pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) {
268        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
269            peer.update_last_seen();
270        }
271    }
272
273    /// Spawns a task with the given future; it should only be used for long-running tasks.
274    pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
275        self.handles.lock().push(tokio::spawn(future));
276    }
277
278    /// Shuts down the router.
279    pub async fn shut_down(&self) {
280        info!("Shutting down the router...");
281        // Save the best peers for future use.
282        if let Err(e) = self.save_best_peers(&self.storage_mode, PEER_CACHE_FILENAME, Some(MAX_PEERS_TO_SEND)) {
283            warn!("Failed to persist best peers to disk: {e}");
284        }
285        // Abort the tasks.
286        self.handles.lock().iter().for_each(|handle| handle.abort());
287        // Close the listener.
288        self.tcp.shut_down().await;
289    }
290}
291
292#[async_trait]
293impl<N: Network> CommunicationService for Router<N> {
294    /// The message type.
295    type Message = Message<N>;
296
297    /// Prepares a block request to be sent.
298    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
299        debug_assert!(start_height < end_height, "Invalid block request format");
300        Message::BlockRequest(BlockRequest { start_height, end_height })
301    }
302
303    /// Sends the given message to specified peer.
304    ///
305    /// This function returns as soon as the message is queued to be sent,
306    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
307    /// which can be used to determine when and whether the message has been delivered.
308    async fn send(
309        &self,
310        peer_ip: SocketAddr,
311        message: Self::Message,
312    ) -> Option<tokio::sync::oneshot::Receiver<io::Result<()>>> {
313        self.send(peer_ip, message)
314    }
315}