snarkos_node_router/
heartbeat.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    ConnectedPeer,
18    NodeType,
19    Outbound,
20    PeerPoolHandling,
21    Router,
22    bootstrap_peers,
23    messages::{DisconnectReason, Message, PeerRequest},
24};
25use snarkvm::prelude::Network;
26
27use snarkos_node_tcp::P2P;
28
29use colored::Colorize;
30use rand::{prelude::IteratorRandom, rngs::OsRng};
31
32/// A helper function to compute the maximum of two numbers.
33/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
34pub const fn max(a: usize, b: usize) -> usize {
35    match a > b {
36        true => a,
37        false => b,
38    }
39}
40
41#[async_trait]
42pub trait Heartbeat<N: Network>: Outbound<N> {
43    /// The duration in seconds to sleep in between heartbeat executions.
44    const HEARTBEAT_IN_SECS: u64 = 25; // 25 seconds
45    /// The minimum number of peers required to maintain connections with.
46    const MINIMUM_NUMBER_OF_PEERS: usize = 3;
47    /// The median number of peers to maintain connections with.
48    const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
49    /// The maximum number of peers permitted to maintain connections with.
50    const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
51    /// The maximum number of provers to maintain connections with.
52    const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
53    /// The amount of time an IP address is prohibited from connecting.
54    const IP_BAN_TIME_IN_SECS: u64 = 300;
55
56    /// Handles the heartbeat request.
57    async fn heartbeat(&self) {
58        self.safety_check_minimum_number_of_peers();
59        self.log_connected_peers();
60
61        // Remove any stale connected peers.
62        self.remove_stale_connected_peers();
63        // Remove the oldest connected peer.
64        self.remove_oldest_connected_peer();
65        // Keep the number of connected peers within the allowed range.
66        self.handle_connected_peers();
67        // Keep the bootstrap peers within the allowed range.
68        self.handle_bootstrap_peers().await;
69        // Keep the trusted peers connected.
70        self.handle_trusted_peers().await;
71        // Keep the puzzle request up to date.
72        self.handle_puzzle_request();
73        // Unban any addresses whose ban time has expired.
74        self.handle_banned_ips();
75    }
76
77    /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
78    /// This function performs safety checks on the setting for the minimum number of peers.
79    fn safety_check_minimum_number_of_peers(&self) {
80        // Perform basic sanity checks on the configuration for the number of peers.
81        assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
82        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
83        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
84        assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
85        assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
86    }
87
88    /// This function logs the connected peers.
89    fn log_connected_peers(&self) {
90        // Log the connected peers.
91        let connected_peers = self.router().connected_peers();
92        let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
93        match connected_peers.len() {
94            0 => warn!("No connected peers"),
95            1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
96            num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
97        }
98    }
99
100    /// This function removes any connected peers that have not communicated within the predefined time.
101    fn remove_stale_connected_peers(&self) {
102        // Check if any connected peer is stale.
103        for peer in self.router().get_connected_peers() {
104            // Disconnect if the peer has not communicated back within the predefined time.
105            let elapsed = peer.last_seen.elapsed();
106            if elapsed > Router::<N>::MAX_RADIO_SILENCE {
107                warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
108                // Disconnect from this peer.
109                self.router().disconnect(peer.listener_addr);
110            }
111        }
112    }
113
114    /// Returns a sorted vector of network addresses of all removable connected peers
115    /// where the first entry has the lowest priority and the last one the highest.
116    ///
117    /// Rules:
118    ///     - Trusted peers and bootstrap nodes are not removable.
119    ///     - Peers that we are currently syncing with are not removable.
120    ///     - Connections that have not been seen in a while are considered lower priority.
121    fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
122        // Are we synced already? (cache this here, so it does not need to be recomputed)
123        let is_block_synced = self.is_block_synced();
124
125        // Sort by priority, where lowest priority will be at the beginning
126        // of the vector.
127        // Note, that this gives equal priority to clients and provers, which
128        // we might want to change in the future.
129        let mut peers = self.router().filter_connected_peers(|peer| {
130            !peer.trusted
131                && peer.node_type != NodeType::BootstrapClient
132                && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us.
133                && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer.
134        });
135        peers.sort_by_key(|peer| peer.last_seen);
136
137        peers
138    }
139
140    /// This function removes the peer that we have not heard from the longest,
141    /// to keep the connections fresh.
142    /// It only triggers if the router is above the minimum number of connected peers.
143    fn remove_oldest_connected_peer(&self) {
144        // Skip if the node is not requesting peers.
145        if self.router().trusted_peers_only() {
146            return;
147        }
148
149        // Skip if the router is at or below the minimum number of connected peers.
150        if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
151            return;
152        }
153
154        // Disconnect from the oldest connected peer, which is the first entry in the list
155        // of removable peers.
156        // Do nothing, if the list is empty.
157        if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
158            info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
159            let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
160            self.router().disconnect(oldest);
161        }
162    }
163
164    /// This function keeps the number of connected peers within the allowed range.
165    fn handle_connected_peers(&self) {
166        // Initialize an RNG.
167        let rng = &mut OsRng;
168
169        // Obtain the number of connected peers.
170        let num_connected = self.router().number_of_connected_peers();
171        // Obtain the number of connected provers.
172        let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
173
174        // Determine the maximum number of peers and provers to keep.
175        let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
176
177        // Compute the number of surplus peers.
178        let num_surplus_peers = num_connected.saturating_sub(max_peers);
179        // Compute the number of surplus provers.
180        let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
181        // Compute the number of provers remaining connected.
182        let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
183        // Compute the number of surplus clients and validators.
184        let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
185
186        if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
187            debug!(
188                "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
189            );
190
191            // Determine the provers to disconnect from.
192            let provers_to_disconnect = self
193                .router()
194                .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
195                .into_iter()
196                .choose_multiple(rng, num_surplus_provers);
197
198            // Determine the clients and validators to disconnect from.
199            let peers_to_disconnect = self
200                .get_removable_peers()
201                .into_iter()
202                .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
203                .take(num_surplus_clients_validators);
204
205            // Proceed to send disconnect requests to these peers.
206            for peer in peers_to_disconnect.chain(provers_to_disconnect) {
207                // TODO (howardwu): Remove this after specializing this function.
208                if self.router().node_type().is_prover() {
209                    continue;
210                }
211
212                let peer_addr = peer.listener_addr;
213                info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
214                self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
215                // Disconnect from this peer.
216                self.router().disconnect(peer_addr);
217            }
218        }
219
220        // Obtain the number of connected peers.
221        let num_connected = self.router().number_of_connected_peers();
222        // Compute the number of deficit peers.
223        let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
224
225        if num_deficient > 0 {
226            // Initialize an RNG.
227            let rng = &mut OsRng;
228
229            // Attempt to connect to more peers, separately choosing from those at a greater block
230            // height, and those whose height is lower or unknown to us.
231            let own_height = self.router().ledger.latest_block_height();
232            let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
233                .router()
234                .get_candidate_peers()
235                .into_iter()
236                .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
237            // We may not know of half of `num_deficient` candidates; account for it using `min`.
238            let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
239            for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
240                self.router().connect(peer.listener_addr);
241            }
242            for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
243                self.router().connect(peer.listener_addr);
244            }
245
246            if !self.router().trusted_peers_only() {
247                // Request more peers from the connected peers.
248                for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
249                    self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
250                }
251            }
252        }
253    }
254
255    /// This function keeps the number of bootstrap peers within the allowed range.
256    async fn handle_bootstrap_peers(&self) {
257        // Return early if we are in trusted peers only mode.
258        if self.router().trusted_peers_only() {
259            return;
260        }
261        // Split the bootstrap peers into connected and candidate lists.
262        let mut candidate_bootstrap = Vec::new();
263        let connected_bootstrap =
264            self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
265        for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
266            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
267                candidate_bootstrap.push(bootstrap_ip);
268            }
269        }
270        // If there are not enough connected bootstrap peers, connect to more.
271        if connected_bootstrap.is_empty() {
272            // Initialize an RNG.
273            let rng = &mut OsRng;
274            // Attempt to connect to a bootstrap peer.
275            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
276                match self.router().connect(peer_ip) {
277                    Some(hdl) => {
278                        let result = hdl.await;
279                        if let Err(err) = result {
280                            warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
281                        }
282                    }
283                    None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
284                }
285            }
286        }
287        // Determine if the node is connected to more bootstrap peers than allowed.
288        let num_surplus = connected_bootstrap.len().saturating_sub(1);
289        if num_surplus > 0 {
290            // Initialize an RNG.
291            let rng = &mut OsRng;
292            // Proceed to send disconnect requests to these bootstrap peers.
293            for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
294                info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
295                self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
296                // Disconnect from this peer.
297                self.router().disconnect(peer.listener_addr);
298            }
299        }
300    }
301
302    /// This function attempts to connect to any disconnected trusted peers.
303    async fn handle_trusted_peers(&self) {
304        // Ensure that the trusted nodes are connected.
305        let handles: Vec<_> = self
306            .router()
307            .unconnected_trusted_peers()
308            .iter()
309            .filter_map(|listener_addr| {
310                debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
311                let hdl = self.router().connect(*listener_addr);
312                if hdl.is_none() {
313                    warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
314                }
315                hdl
316            })
317            .collect();
318
319        for result in futures::future::join_all(handles).await {
320            if let Err(err) = result {
321                warn!("Could not connect to trusted peer: {err}");
322            }
323        }
324    }
325
326    /// This function updates the puzzle if network has updated.
327    fn handle_puzzle_request(&self) {
328        // No-op
329    }
330
331    // Remove addresses whose ban time has expired.
332    fn handle_banned_ips(&self) {
333        self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
334    }
335}