snarkos_node_router/
heartbeat.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    ConnectedPeer,
18    Outbound,
19    Router,
20    messages::{DisconnectReason, Message, PeerRequest},
21};
22use snarkvm::prelude::Network;
23
24use snarkos_node_tcp::P2P;
25
26use colored::Colorize;
27use rand::{Rng, prelude::IteratorRandom, rngs::OsRng};
28
29/// A helper function to compute the maximum of two numbers.
30/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
31pub const fn max(a: usize, b: usize) -> usize {
32    match a > b {
33        true => a,
34        false => b,
35    }
36}
37
38#[async_trait]
39pub trait Heartbeat<N: Network>: Outbound<N> {
40    /// The duration in seconds to sleep in between heartbeat executions.
41    const HEARTBEAT_IN_SECS: u64 = 25; // 25 seconds
42    /// The minimum number of peers required to maintain connections with.
43    const MINIMUM_NUMBER_OF_PEERS: usize = 3;
44    /// The median number of peers to maintain connections with.
45    const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
46    /// The maximum number of peers permitted to maintain connections with.
47    const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
48    /// The maximum number of provers to maintain connections with.
49    const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
50    /// The amount of time an IP address is prohibited from connecting.
51    const IP_BAN_TIME_IN_SECS: u64 = 300;
52
53    /// Handles the heartbeat request.
54    async fn heartbeat(&self) {
55        self.safety_check_minimum_number_of_peers();
56        self.log_connected_peers();
57
58        // Remove any stale connected peers.
59        self.remove_stale_connected_peers();
60        // Remove the oldest connected peer.
61        self.remove_oldest_connected_peer();
62        // Keep the number of connected peers within the allowed range.
63        self.handle_connected_peers();
64        // Keep the bootstrap peers within the allowed range.
65        self.handle_bootstrap_peers().await;
66        // Keep the trusted peers connected.
67        self.handle_trusted_peers().await;
68        // Keep the puzzle request up to date.
69        self.handle_puzzle_request();
70        // Unban any addresses whose ban time has expired.
71        self.handle_banned_ips();
72    }
73
74    /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
75    /// This function performs safety checks on the setting for the minimum number of peers.
76    fn safety_check_minimum_number_of_peers(&self) {
77        // Perform basic sanity checks on the configuration for the number of peers.
78        assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
79        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
80        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
81        assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
82        assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
83    }
84
85    /// This function logs the connected peers.
86    fn log_connected_peers(&self) {
87        // Log the connected peers.
88        let connected_peers = self.router().connected_peers();
89        let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
90        match connected_peers.len() {
91            0 => debug!("No connected peers"),
92            1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
93            num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
94        }
95    }
96
97    /// This function removes any connected peers that have not communicated within the predefined time.
98    fn remove_stale_connected_peers(&self) {
99        // Check if any connected peer is stale.
100        for peer in self.router().get_connected_peers() {
101            // Disconnect if the peer has not communicated back within the predefined time.
102            let elapsed = peer.last_seen.elapsed();
103            if elapsed > Router::<N>::MAX_RADIO_SILENCE {
104                warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
105                // Disconnect from this peer.
106                self.router().disconnect(peer.listener_addr);
107            }
108        }
109    }
110
111    /// Returns a sorted vector of network addresses of all removable connected peers
112    /// where the first entry has the lowest priority and the last one the highest.
113    ///
114    /// Rules:
115    ///     - Trusted peers and bootstrap nodes are not removable.
116    ///     - Peers that we are currently syncing with are not removable.
117    ///     - Validators are considered higher priority than provers or clients.
118    ///     - Connections that have not been seen in a while are considered lower priority.
119    fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
120        // The hardcoded bootstrap nodes.
121        let bootstrap = self.router().bootstrap_peers();
122        // Are we synced already? (cache this here, so it does not need to be recomputed)
123        let is_block_synced = self.is_block_synced();
124
125        // Sort by priority, where lowest priority will be at the beginning
126        // of the vector.
127        // Note, that this gives equal priority to clients and provers, which
128        // we might want to change in the future.
129        let mut peers = self.router().filter_connected_peers(|peer| {
130            !peer.trusted
131                && !bootstrap.contains(&peer.listener_addr)
132                && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us.
133                && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer.
134        });
135        peers.sort_by_key(|peer| (peer.node_type.is_validator(), peer.last_seen));
136
137        peers
138    }
139
140    /// This function removes the peer that we have not heard from the longest,
141    /// to keep the connections fresh.
142    /// It only triggers if the router is above the minimum number of connected peers.
143    fn remove_oldest_connected_peer(&self) {
144        // Skip if the node is not requesting peers.
145        if !self.router().allow_external_peers() {
146            return;
147        }
148
149        // Skip if the router is at or below the minimum number of connected peers.
150        if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
151            return;
152        }
153
154        // Disconnect from the oldest connected peer, which is the first entry in the list
155        // of removable peers.
156        // Do nothing, if the list is empty.
157        if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
158            info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
159            let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
160            self.router().disconnect(oldest);
161        }
162    }
163
164    /// This function keeps the number of connected peers within the allowed range.
165    fn handle_connected_peers(&self) {
166        // Initialize an RNG.
167        let rng = &mut OsRng;
168
169        // Obtain the number of connected peers.
170        let num_connected = self.router().number_of_connected_peers();
171        // Obtain the number of connected provers.
172        let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
173
174        // Consider rotating more external peers every ~10 heartbeats.
175        let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
176        // Determine the maximum number of peers and provers to keep.
177        let (max_peers, max_provers) = if reduce_peers {
178            (Self::MEDIAN_NUMBER_OF_PEERS, 0)
179        } else {
180            (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
181        };
182
183        // Compute the number of surplus peers.
184        let num_surplus_peers = num_connected.saturating_sub(max_peers);
185        // Compute the number of surplus provers.
186        let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
187        // Compute the number of provers remaining connected.
188        let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
189        // Compute the number of surplus clients and validators.
190        let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
191
192        if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
193            debug!(
194                "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
195            );
196
197            // Retrieve the bootstrap peers.
198            let bootstrap = self.router().bootstrap_peers();
199
200            // Determine the provers to disconnect from.
201            let provers_to_disconnect = self
202                .router()
203                .filter_connected_peers(|peer| {
204                    peer.node_type.is_prover() && !peer.trusted && !bootstrap.contains(&peer.listener_addr)
205                })
206                .into_iter()
207                .choose_multiple(rng, num_surplus_provers);
208
209            // Determine the clients and validators to disconnect from.
210            let peers_to_disconnect = self
211                .get_removable_peers()
212                .into_iter()
213                .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
214                .take(num_surplus_clients_validators);
215
216            // Proceed to send disconnect requests to these peers.
217            for peer in peers_to_disconnect.chain(provers_to_disconnect) {
218                // TODO (howardwu): Remove this after specializing this function.
219                if self.router().node_type().is_prover() && peer.node_type.is_validator() {
220                    continue;
221                }
222
223                let peer_addr = peer.listener_addr;
224                info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
225                self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
226                // Disconnect from this peer.
227                self.router().disconnect(peer_addr);
228            }
229        }
230
231        // Obtain the number of connected peers.
232        let num_connected = self.router().number_of_connected_peers();
233        // Compute the number of deficit peers.
234        let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
235
236        if num_deficient > 0 {
237            // Initialize an RNG.
238            let rng = &mut OsRng;
239
240            // Attempt to connect to more peers.
241            for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
242                self.router().connect(peer_ip);
243            }
244
245            if self.router().allow_external_peers() {
246                // Request more peers from the connected peers.
247                for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
248                    self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
249                }
250            }
251        }
252    }
253
254    /// This function keeps the number of bootstrap peers within the allowed range.
255    async fn handle_bootstrap_peers(&self) {
256        // Split the bootstrap peers into connected and candidate lists.
257        let mut connected_bootstrap = Vec::new();
258        let mut candidate_bootstrap = Vec::new();
259        let connected_peers = self.router().connected_peers();
260        for bootstrap_ip in self.router().bootstrap_peers() {
261            match connected_peers.contains(&bootstrap_ip) {
262                true => connected_bootstrap.push(bootstrap_ip),
263                false => candidate_bootstrap.push(bootstrap_ip),
264            }
265        }
266        // If there are not enough connected bootstrap peers, connect to more.
267        if connected_bootstrap.is_empty() {
268            // Initialize an RNG.
269            let rng = &mut OsRng;
270            // Attempt to connect to a bootstrap peer.
271            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
272                match self.router().connect(peer_ip) {
273                    Some(hdl) => {
274                        let result = hdl.await;
275                        if let Err(err) = result {
276                            warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
277                        }
278                    }
279                    None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
280                }
281            }
282        }
283        // Determine if the node is connected to more bootstrap peers than allowed.
284        let num_surplus = connected_bootstrap.len().saturating_sub(1);
285        if num_surplus > 0 {
286            // Initialize an RNG.
287            let rng = &mut OsRng;
288            // Proceed to send disconnect requests to these bootstrap peers.
289            for peer_ip in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
290                info!("Disconnecting from '{peer_ip}' (exceeded maximum bootstrap)");
291                self.router().send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
292                // Disconnect from this peer.
293                self.router().disconnect(peer_ip);
294            }
295        }
296    }
297
298    /// This function attempts to connect to any disconnected trusted peers.
299    async fn handle_trusted_peers(&self) {
300        // Ensure that the trusted nodes are connected.
301        let handles: Vec<_> = self
302            .router()
303            .unconnected_trusted_peers()
304            .iter()
305            .filter_map(|listener_addr| {
306                debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
307                let hdl = self.router().connect(*listener_addr);
308                if hdl.is_none() {
309                    warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
310                }
311                hdl
312            })
313            .collect();
314
315        for result in futures::future::join_all(handles).await {
316            if let Err(err) = result {
317                warn!("Could not connect to trusted peer: {err}");
318            }
319        }
320    }
321
322    /// This function updates the puzzle if network has updated.
323    fn handle_puzzle_request(&self) {
324        // No-op
325    }
326
327    // Remove addresses whose ban time has expired.
328    fn handle_banned_ips(&self) {
329        self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
330    }
331}