Skip to main content

snarkos_node_router/
heartbeat.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    CandidatePeer,
18    ConnectedPeer,
19    NodeType,
20    Outbound,
21    PeerPoolHandling,
22    bootstrap_peers,
23    messages::{DisconnectReason, Message, PeerRequest},
24};
25
26use snarkos_node_tcp::{ConnectError, P2P};
27
28use snarkvm::prelude::Network;
29
30use colored::Colorize;
31use futures::future::join_all;
32use rand::{SeedableRng, prelude::IteratorRandom};
33use rand_chacha::ChaChaRng;
34use std::time::Duration;
35use tokio::task::JoinError;
36
37/// A helper function to compute the maximum of two numbers.
38/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
39pub const fn max(a: usize, b: usize) -> usize {
40    match a > b {
41        true => a,
42        false => b,
43    }
44}
45
46#[async_trait]
47pub trait Heartbeat<N: Network>: Outbound<N> {
48    /// The duration in seconds to sleep in between heartbeat executions.
49    const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
50    /// The minimum number of peers required to maintain connections with.
51    const MINIMUM_NUMBER_OF_PEERS: usize = 3;
52    /// The minimum time between connection attempts to a peer.
53    const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
54    /// The time we consider the node to be starting up and avoid certain warnings such as "No connected peers".
55    const STARTUP_GRACE_PERIOD: Duration = Duration::from_secs(60);
56    /// The median number of peers to maintain connections with.
57    const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
58    /// The maximum number of peers permitted to maintain connections with.
59    const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
60    /// The maximum number of provers to maintain connections with.
61    const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
62    /// The amount of time an IP address is prohibited from connecting.
63    const IP_BAN_TIME_IN_SECS: u64 = 300;
64
65    /// Handles the heartbeat request.
66    async fn heartbeat(&self) {
67        self.safety_check_minimum_number_of_peers();
68        self.log_connected_peers();
69
70        // Remove the oldest connected peer.
71        self.remove_oldest_connected_peer();
72        // Keep the number of connected peers within the allowed range.
73        self.handle_connected_peers().await;
74        // Keep the bootstrap peers within the allowed range.
75        self.handle_bootstrap_peers().await;
76        // Keep the trusted peers connected.
77        self.handle_trusted_peers().await;
78        // Keep the puzzle request up to date.
79        self.handle_puzzle_request();
80        // Unban any addresses whose ban time has expired.
81        self.handle_banned_ips();
82    }
83
84    /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
85    /// This function performs safety checks on the setting for the minimum number of peers.
86    fn safety_check_minimum_number_of_peers(&self) {
87        // Perform basic sanity checks on the configuration for the number of peers.
88        assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
89        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
90        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
91        assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
92        assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
93    }
94
95    /// This function logs the connected peers.
96    fn log_connected_peers(&self) {
97        // Log the connected peers.
98        let connected_peers = self.router().connected_peers();
99        let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
100        match connected_peers.len() {
101            0 => {
102                // Only log a warning if the node has been running for a while.
103                if self.router().tcp().uptime() > Self::STARTUP_GRACE_PERIOD {
104                    warn!("No connected peers")
105                }
106            }
107            1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
108            num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
109        }
110    }
111
112    /// Returns a sorted vector of network addresses of all removable connected peers
113    /// where the first entry has the lowest priority and the last one the highest.
114    ///
115    /// Rules:
116    ///     - Trusted peers and bootstrap nodes are not removable.
117    ///     - Peers that we are currently syncing with are not removable.
118    ///     - Connections that have not been seen in a while are considered lower priority.
119    fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
120        // Are we synced already? (cache this here, so it does not need to be recomputed)
121        let is_block_synced = self.is_block_synced();
122
123        // Sort by priority, where lowest priority will be at the beginning
124        // of the vector.
125        // Note, that this gives equal priority to clients and provers, which
126        // we might want to change in the future.
127        let mut peers = self.router().filter_connected_peers(|peer| {
128            !peer.trusted
129                && peer.node_type != NodeType::BootstrapClient
130                && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us.
131                && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer.
132        });
133        peers.sort_by_key(|peer| peer.last_seen);
134
135        peers
136    }
137
138    /// This function removes the peer that we have not heard from the longest,
139    /// to keep the connections fresh.
140    /// It only triggers if the router is above the minimum number of connected peers.
141    fn remove_oldest_connected_peer(&self) {
142        // Skip if the node is not requesting peers.
143        if self.router().trusted_peers_only() {
144            return;
145        }
146
147        // Skip if the router is at or below the minimum number of connected peers.
148        if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
149            return;
150        }
151
152        // Disconnect from the oldest connected peer, which is the first entry in the list
153        // of removable peers.
154        // Do nothing, if the list is empty.
155        if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
156            info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
157            let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
158            self.router().disconnect(oldest);
159        }
160    }
161
162    /// Logs a message with the error and `context` if the connection attempt failed,
163    /// and sets the log level based on the severity of the error.
164    #[inline]
165    fn log_if_connect_error(result: Result<Result<(), ConnectError>, JoinError>, context: &str) {
166        match result {
167            // Success!
168            Ok(Ok(())) => {}
169            Ok(Err(err @ ConnectError::AlreadyConnecting { .. }))
170            | Ok(Err(err @ ConnectError::AlreadyConnected { .. })) => {
171                // Log benign errors at a lower level.
172                debug!("{context}: {err}");
173            }
174            // Print regular connection errors (such as "connection refused" as warnings)
175            Ok(Err(err)) => warn!("{context}: {err}"),
176            // Print join errors as error, as they most likely indicate a crash.
177            Err(err) => error!("{context}: {err}"),
178        }
179    }
180
181    /// This function keeps the number of connected peers within the allowed range.
182    async fn handle_connected_peers(&self) {
183        // Initialize an RNG.
184        let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
185
186        // Obtain the number of connected peers.
187        let num_connected = self.router().number_of_connected_peers();
188        // Obtain the number of connected provers.
189        let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
190
191        // Determine the maximum number of peers and provers to keep.
192        let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
193
194        // Compute the number of surplus peers.
195        let num_surplus_peers = num_connected.saturating_sub(max_peers);
196        // Compute the number of surplus provers.
197        let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
198        // Compute the number of provers remaining connected.
199        let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
200        // Compute the number of surplus clients and validators.
201        let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
202
203        if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
204            debug!(
205                "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
206            );
207
208            // Determine the provers to disconnect from.
209            let provers_to_disconnect = self
210                .router()
211                .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
212                .into_iter()
213                .sample(rng, num_surplus_provers);
214
215            // Determine the clients and validators to disconnect from.
216            let peers_to_disconnect = self
217                .get_removable_peers()
218                .into_iter()
219                .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
220                .take(num_surplus_clients_validators);
221
222            // Proceed to send disconnect requests to these peers.
223            for peer in peers_to_disconnect.chain(provers_to_disconnect) {
224                // TODO (howardwu): Remove this after specializing this function.
225                if self.router().node_type().is_prover() {
226                    continue;
227                }
228
229                let peer_addr = peer.listener_addr;
230                info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
231                self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
232                // Disconnect from this peer.
233                self.router().disconnect(peer_addr);
234            }
235        }
236
237        // Obtain the number of connected peers.
238        let num_connected = self.router().number_of_connected_peers();
239        // Compute the number of deficit peers.
240        let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
241
242        if num_deficient > 0 {
243            // Initialize an RNG.
244            let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
245
246            // Attempt to connect to more peers, separately choosing from those at a greater block
247            // height, and those whose height is lower or unknown to us.
248            let own_height = self.router().ledger.latest_block_height();
249            let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
250                .router()
251                .get_candidate_peers()
252                .into_iter()
253                .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
254            // We may not know of half of `num_deficient` candidates; account for it using `min`.
255            let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
256
257            let higher_peers = higher_peers.into_iter().sample(rng, num_higher_peers);
258            let other_peers = other_peers.into_iter().sample(rng, num_deficient.saturating_sub(num_higher_peers));
259
260            // Initiate connection attempts and wait for them to complete.
261            self.try_connect_to_peers(higher_peers.into_iter().chain(other_peers)).await;
262
263            if !self.router().trusted_peers_only() {
264                // Request more peers from the connected peers.
265                for peer_ip in self.router().connected_peers().into_iter().sample(rng, 3) {
266                    self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
267                }
268            }
269        }
270    }
271
272    /// This function keeps the number of bootstrap peers within the allowed range.
273    async fn handle_bootstrap_peers(&self) {
274        // Return early if we are in trusted peers only mode.
275        if self.router().trusted_peers_only() {
276            return;
277        }
278        // Split the bootstrap peers into connected and candidate lists.
279        let mut candidate_bootstrap = Vec::new();
280        let connected_bootstrap =
281            self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
282        for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
283            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
284                candidate_bootstrap.push(bootstrap_ip);
285            }
286        }
287        // If there are not enough connected bootstrap peers, connect to more.
288        if connected_bootstrap.is_empty() {
289            // Initialize an RNG.
290            let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
291            // Attempt to connect to a random bootstrap peer.
292            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
293                match self.router().connect(peer_ip) {
294                    Ok(hdl) => {
295                        Self::log_if_connect_error(
296                            hdl.await,
297                            &format!("Could not connect to bootstrap peer at '{peer_ip:?}'"),
298                        );
299                    }
300                    Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
301                    Err(err) => warn!("Could not initiate connection to bootstrap peer at '{peer_ip:?}' - {err}"),
302                }
303            }
304        }
305        // Determine if the node is connected to more bootstrap peers than allowed.
306        let num_surplus = connected_bootstrap.len().saturating_sub(1);
307        if num_surplus > 0 {
308            // Initialize an RNG.
309            let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
310            // Proceed to send disconnect requests to these bootstrap peers.
311            for peer in connected_bootstrap.into_iter().sample(rng, num_surplus) {
312                info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
313                self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
314                // Disconnect from this peer.
315                self.router().disconnect(peer.listener_addr);
316            }
317        }
318    }
319
320    /// Helper function that attempts to connect the given peers.
321    ///
322    /// Used by [`Self::handle_trusted_peers`] and [`Self::handle_connected_peers`].
323    async fn try_connect_to_peers(&self, peers: impl Iterator<Item = CandidatePeer> + Send + 'static) {
324        let (peer_info, hdls): (Vec<_>, Vec<_>) = peers
325            .filter_map(|peer| {
326                let peer_type = if peer.trusted { "trusted peer" } else { "peer" };
327
328                // Do not attempt to reconnect too frequently.
329                // TODO (kaimast): Consider increasing the minimum time based on the number of failed attempts.
330                if let Some(last_connection_attempt) = peer.last_connection_attempt
331                    && last_connection_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
332                {
333                    return None;
334                }
335
336                // Get the peers address.
337                let addr = peer.listener_addr;
338                let attempt_no = peer.total_connection_attempts + 1;
339
340                // Start connection attempt.
341                debug!("(Re-)connecting to {peer_type} '{addr}' (attempt #{attempt_no})");
342                match self.router().connect(addr) {
343                    Ok(hdl) => Some(((addr, attempt_no, peer_type), hdl)),
344                    Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => None,
345                    Err(err) => {
346                        warn!("Could not initiate connection to {peer_type} at '{addr}' - {err}");
347                        None
348                    }
349                }
350            })
351            .unzip();
352
353        // Wait for all the connection attempts to complete.
354        for ((peer_addr, attempt_no, peer_type), result) in peer_info.into_iter().zip(join_all(hdls).await) {
355            Self::log_if_connect_error(
356                result,
357                &format!("Could not connect to {peer_type} at '{peer_addr}' (attempt #{attempt_no})"),
358            );
359        }
360    }
361
362    /// This function attempts to connect to any disconnected trusted peers.
363    async fn handle_trusted_peers(&self) {
364        self.try_connect_to_peers(self.router().get_trusted_candidate_peers().into_iter()).await;
365    }
366
367    /// This function updates the puzzle if network has updated.
368    fn handle_puzzle_request(&self) {
369        // No-op
370    }
371
372    // Remove addresses whose ban time has expired.
373    fn handle_banned_ips(&self) {
374        self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
375    }
376}