Skip to main content

snarkos_node_router/
heartbeat.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    CandidatePeer,
18    ConnectedPeer,
19    NodeType,
20    Outbound,
21    PeerPoolHandling,
22    Router,
23    bootstrap_peers,
24    messages::{DisconnectReason, Message, PeerRequest},
25};
26
27use snarkos_node_tcp::{ConnectError, P2P};
28
29use snarkvm::prelude::Network;
30
31use colored::Colorize;
32use futures::future::join_all;
33use rand::{prelude::IteratorRandom, rngs::OsRng};
34use std::time::Duration;
35use tokio::task::JoinError;
36
37/// A helper function to compute the maximum of two numbers.
38/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
39pub const fn max(a: usize, b: usize) -> usize {
40    match a > b {
41        true => a,
42        false => b,
43    }
44}
45
46#[async_trait]
47pub trait Heartbeat<N: Network>: Outbound<N> {
48    /// The duration in seconds to sleep in between heartbeat executions.
49    const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
50    /// The minimum number of peers required to maintain connections with.
51    const MINIMUM_NUMBER_OF_PEERS: usize = 3;
52    /// The minimum time between connection attempts to a peer.
53    const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
54    /// The time we consider the node to be starting up and avoid certain warnings such as "No connected peers".
55    const STARTUP_GRACE_PERIOD: Duration = Duration::from_secs(60);
56    /// The median number of peers to maintain connections with.
57    const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
58    /// The maximum number of peers permitted to maintain connections with.
59    const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
60    /// The maximum number of provers to maintain connections with.
61    const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
62    /// The amount of time an IP address is prohibited from connecting.
63    const IP_BAN_TIME_IN_SECS: u64 = 300;
64
65    /// Handles the heartbeat request.
66    async fn heartbeat(&self) {
67        self.safety_check_minimum_number_of_peers();
68        self.log_connected_peers();
69
70        // Remove any stale connected peers.
71        self.remove_stale_connected_peers();
72        // Remove the oldest connected peer.
73        self.remove_oldest_connected_peer();
74        // Keep the number of connected peers within the allowed range.
75        self.handle_connected_peers().await;
76        // Keep the bootstrap peers within the allowed range.
77        self.handle_bootstrap_peers().await;
78        // Keep the trusted peers connected.
79        self.handle_trusted_peers().await;
80        // Keep the puzzle request up to date.
81        self.handle_puzzle_request();
82        // Unban any addresses whose ban time has expired.
83        self.handle_banned_ips();
84    }
85
86    /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
87    /// This function performs safety checks on the setting for the minimum number of peers.
88    fn safety_check_minimum_number_of_peers(&self) {
89        // Perform basic sanity checks on the configuration for the number of peers.
90        assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
91        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
92        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
93        assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
94        assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
95    }
96
97    /// This function logs the connected peers.
98    fn log_connected_peers(&self) {
99        // Log the connected peers.
100        let connected_peers = self.router().connected_peers();
101        let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
102        match connected_peers.len() {
103            0 => {
104                // Only log a warning if the node has been running for a while.
105                if self.router().tcp().uptime() > Self::STARTUP_GRACE_PERIOD {
106                    warn!("No connected peers")
107                }
108            }
109            1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
110            num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
111        }
112    }
113
114    /// This function removes any connected peers that have not communicated within the predefined time.
115    fn remove_stale_connected_peers(&self) {
116        // Check if any connected peer is stale.
117        for peer in self.router().get_connected_peers() {
118            // Disconnect if the peer has not communicated back within the predefined time.
119            let elapsed = peer.last_seen.elapsed();
120            if elapsed > Router::<N>::MAX_RADIO_SILENCE {
121                warn!("Peer '{}' has not communicated in {elapsed:?}", peer.listener_addr);
122                // Disconnect from this peer.
123                self.router().disconnect(peer.listener_addr);
124            }
125        }
126    }
127
128    /// Returns a sorted vector of network addresses of all removable connected peers
129    /// where the first entry has the lowest priority and the last one the highest.
130    ///
131    /// Rules:
132    ///     - Trusted peers and bootstrap nodes are not removable.
133    ///     - Peers that we are currently syncing with are not removable.
134    ///     - Connections that have not been seen in a while are considered lower priority.
135    fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
136        // Are we synced already? (cache this here, so it does not need to be recomputed)
137        let is_block_synced = self.is_block_synced();
138
139        // Sort by priority, where lowest priority will be at the beginning
140        // of the vector.
141        // Note, that this gives equal priority to clients and provers, which
142        // we might want to change in the future.
143        let mut peers = self.router().filter_connected_peers(|peer| {
144            !peer.trusted
145                && peer.node_type != NodeType::BootstrapClient
146                && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us.
147                && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer.
148        });
149        peers.sort_by_key(|peer| peer.last_seen);
150
151        peers
152    }
153
154    /// This function removes the peer that we have not heard from the longest,
155    /// to keep the connections fresh.
156    /// It only triggers if the router is above the minimum number of connected peers.
157    fn remove_oldest_connected_peer(&self) {
158        // Skip if the node is not requesting peers.
159        if self.router().trusted_peers_only() {
160            return;
161        }
162
163        // Skip if the router is at or below the minimum number of connected peers.
164        if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
165            return;
166        }
167
168        // Disconnect from the oldest connected peer, which is the first entry in the list
169        // of removable peers.
170        // Do nothing, if the list is empty.
171        if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
172            info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
173            let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
174            self.router().disconnect(oldest);
175        }
176    }
177
178    /// Logs a message with the error and `context` if the connection attempt failed,
179    /// and sets the log level based on the severity of the error.
180    #[inline]
181    fn log_if_connect_error(result: Result<Result<(), ConnectError>, JoinError>, context: &str) {
182        match result {
183            // Success!
184            Ok(Ok(())) => {}
185            Ok(Err(err @ ConnectError::AlreadyConnecting { .. }))
186            | Ok(Err(err @ ConnectError::AlreadyConnected { .. })) => {
187                // Log benign errors at a lower level.
188                debug!("{context}: {err}");
189            }
190            // Print regular connection errors (such as "connection refused" as warnings)
191            Ok(Err(err)) => warn!("{context}: {err}"),
192            // Print join errors as error, as they most likely indicate a crash.
193            Err(err) => error!("{context}: {err}"),
194        }
195    }
196
197    /// This function keeps the number of connected peers within the allowed range.
198    async fn handle_connected_peers(&self) {
199        // Initialize an RNG.
200        let rng = &mut OsRng;
201
202        // Obtain the number of connected peers.
203        let num_connected = self.router().number_of_connected_peers();
204        // Obtain the number of connected provers.
205        let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
206
207        // Determine the maximum number of peers and provers to keep.
208        let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
209
210        // Compute the number of surplus peers.
211        let num_surplus_peers = num_connected.saturating_sub(max_peers);
212        // Compute the number of surplus provers.
213        let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
214        // Compute the number of provers remaining connected.
215        let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
216        // Compute the number of surplus clients and validators.
217        let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
218
219        if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
220            debug!(
221                "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
222            );
223
224            // Determine the provers to disconnect from.
225            let provers_to_disconnect = self
226                .router()
227                .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
228                .into_iter()
229                .choose_multiple(rng, num_surplus_provers);
230
231            // Determine the clients and validators to disconnect from.
232            let peers_to_disconnect = self
233                .get_removable_peers()
234                .into_iter()
235                .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
236                .take(num_surplus_clients_validators);
237
238            // Proceed to send disconnect requests to these peers.
239            for peer in peers_to_disconnect.chain(provers_to_disconnect) {
240                // TODO (howardwu): Remove this after specializing this function.
241                if self.router().node_type().is_prover() {
242                    continue;
243                }
244
245                let peer_addr = peer.listener_addr;
246                info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
247                self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
248                // Disconnect from this peer.
249                self.router().disconnect(peer_addr);
250            }
251        }
252
253        // Obtain the number of connected peers.
254        let num_connected = self.router().number_of_connected_peers();
255        // Compute the number of deficit peers.
256        let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
257
258        if num_deficient > 0 {
259            // Initialize an RNG.
260            let rng = &mut OsRng;
261
262            // Attempt to connect to more peers, separately choosing from those at a greater block
263            // height, and those whose height is lower or unknown to us.
264            let own_height = self.router().ledger.latest_block_height();
265            let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
266                .router()
267                .get_candidate_peers()
268                .into_iter()
269                .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
270            // We may not know of half of `num_deficient` candidates; account for it using `min`.
271            let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
272
273            let higher_peers = higher_peers.into_iter().choose_multiple(rng, num_higher_peers);
274            let other_peers =
275                other_peers.into_iter().choose_multiple(rng, num_deficient.saturating_sub(num_higher_peers));
276
277            // Initiate connection attempts and wait for them to complete.
278            self.try_connect_to_peers(higher_peers.into_iter().chain(other_peers)).await;
279
280            if !self.router().trusted_peers_only() {
281                // Request more peers from the connected peers.
282                for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
283                    self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
284                }
285            }
286        }
287    }
288
289    /// This function keeps the number of bootstrap peers within the allowed range.
290    async fn handle_bootstrap_peers(&self) {
291        // Return early if we are in trusted peers only mode.
292        if self.router().trusted_peers_only() {
293            return;
294        }
295        // Split the bootstrap peers into connected and candidate lists.
296        let mut candidate_bootstrap = Vec::new();
297        let connected_bootstrap =
298            self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
299        for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
300            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
301                candidate_bootstrap.push(bootstrap_ip);
302            }
303        }
304        // If there are not enough connected bootstrap peers, connect to more.
305        if connected_bootstrap.is_empty() {
306            // Initialize an RNG.
307            let rng = &mut OsRng;
308            // Attempt to connect to a random bootstrap peer.
309            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
310                match self.router().connect(peer_ip) {
311                    Ok(hdl) => {
312                        Self::log_if_connect_error(
313                            hdl.await,
314                            &format!("Could not connect to bootstrap peer at '{peer_ip:?}'"),
315                        );
316                    }
317                    Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
318                    Err(err) => warn!("Could not initiate connection to bootstrap peer at '{peer_ip:?}' - {err}"),
319                }
320            }
321        }
322        // Determine if the node is connected to more bootstrap peers than allowed.
323        let num_surplus = connected_bootstrap.len().saturating_sub(1);
324        if num_surplus > 0 {
325            // Initialize an RNG.
326            let rng = &mut OsRng;
327            // Proceed to send disconnect requests to these bootstrap peers.
328            for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
329                info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
330                self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
331                // Disconnect from this peer.
332                self.router().disconnect(peer.listener_addr);
333            }
334        }
335    }
336
337    /// Helper function that attempts to connect the given peers.
338    ///
339    /// Used by [`Self::handle_trusted_peers`] and [`Self::handle_connected_peers`].
340    async fn try_connect_to_peers(&self, peers: impl Iterator<Item = CandidatePeer> + Send + 'static) {
341        let (peer_info, hdls): (Vec<_>, Vec<_>) = peers
342            .filter_map(|peer| {
343                let peer_type = if peer.trusted { "trusted peer" } else { "peer" };
344
345                // Do not attempt to reconnect too frequently.
346                // TODO (kaimast): Consider increasing the minimum time based on the number of failed attempts.
347                if let Some(last_connection_attempt) = peer.last_connection_attempt
348                    && last_connection_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
349                {
350                    return None;
351                }
352
353                // Get the peers address.
354                let addr = peer.listener_addr;
355                let attempt_no = peer.total_connection_attempts + 1;
356
357                // Start connection attempt.
358                debug!("(Re-)connecting to {peer_type} '{addr}' (attempt #{attempt_no})");
359                match self.router().connect(addr) {
360                    Ok(hdl) => Some(((addr, attempt_no, peer_type), hdl)),
361                    Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => None,
362                    Err(err) => {
363                        warn!("Could not initiate connection to {peer_type} at '{addr}' - {err}");
364                        None
365                    }
366                }
367            })
368            .unzip();
369
370        // Wait for all the connection attempts to complete.
371        for ((peer_addr, attempt_no, peer_type), result) in peer_info.into_iter().zip(join_all(hdls).await) {
372            Self::log_if_connect_error(
373                result,
374                &format!("Could not connect to {peer_type} at '{peer_addr}' (attempt #{attempt_no})"),
375            );
376        }
377    }
378
379    /// This function attempts to connect to any disconnected trusted peers.
380    async fn handle_trusted_peers(&self) {
381        self.try_connect_to_peers(self.router().get_trusted_candidate_peers().into_iter()).await;
382    }
383
384    /// This function updates the puzzle if network has updated.
385    fn handle_puzzle_request(&self) {
386        // No-op
387    }
388
389    // Remove addresses whose ban time has expired.
390    fn handle_banned_ips(&self) {
391        self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
392    }
393}