snarkos_node_router/
heartbeat.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Outbound,
18    Router,
19    messages::{DisconnectReason, Message, PeerRequest},
20};
21use snarkvm::prelude::Network;
22
23use colored::Colorize;
24use rand::{Rng, prelude::IteratorRandom, rngs::OsRng};
25
26/// A helper function to compute the maximum of two numbers.
27/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
28pub const fn max(a: usize, b: usize) -> usize {
29    match a > b {
30        true => a,
31        false => b,
32    }
33}
34
35pub trait Heartbeat<N: Network>: Outbound<N> {
36    /// The duration in seconds to sleep in between heartbeat executions.
37    const HEARTBEAT_IN_SECS: u64 = 25; // 25 seconds
38    /// The minimum number of peers required to maintain connections with.
39    const MINIMUM_NUMBER_OF_PEERS: usize = 3;
40    /// The median number of peers to maintain connections with.
41    const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
42    /// The maximum number of peers permitted to maintain connections with.
43    const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
44    /// The maximum number of provers to maintain connections with.
45    const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
46    /// The amount of time an IP address is prohibited from connecting.
47    const IP_BAN_TIME_IN_SECS: u64 = 300;
48
49    /// Handles the heartbeat request.
50    fn heartbeat(&self) {
51        self.safety_check_minimum_number_of_peers();
52        self.log_connected_peers();
53
54        // Remove any stale connected peers.
55        self.remove_stale_connected_peers();
56        // Remove the oldest connected peer.
57        self.remove_oldest_connected_peer();
58        // Keep the number of connected peers within the allowed range.
59        self.handle_connected_peers();
60        // Keep the bootstrap peers within the allowed range.
61        self.handle_bootstrap_peers();
62        // Keep the trusted peers connected.
63        self.handle_trusted_peers();
64        // Keep the puzzle request up to date.
65        self.handle_puzzle_request();
66        // Unban any addresses whose ban time has expired.
67        self.handle_banned_ips();
68    }
69
70    /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
71    /// This function performs safety checks on the setting for the minimum number of peers.
72    fn safety_check_minimum_number_of_peers(&self) {
73        // Perform basic sanity checks on the configuration for the number of peers.
74        assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
75        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
76        assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
77        assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
78        assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
79    }
80
81    /// This function logs the connected peers.
82    fn log_connected_peers(&self) {
83        // Log the connected peers.
84        let connected_peers = self.router().connected_peers();
85        let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
86        match connected_peers.len() {
87            0 => debug!("No connected peers"),
88            1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
89            num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
90        }
91    }
92
93    /// This function removes any connected peers that have not communicated within the predefined time.
94    fn remove_stale_connected_peers(&self) {
95        // Check if any connected peer is stale.
96        for peer in self.router().get_connected_peers() {
97            // Disconnect if the peer has not communicated back within the predefined time.
98            let elapsed = peer.last_seen().elapsed().as_secs();
99            if elapsed > Router::<N>::RADIO_SILENCE_IN_SECS {
100                warn!("Peer {} has not communicated in {elapsed} seconds", peer.ip());
101                // Disconnect from this peer.
102                self.router().disconnect(peer.ip());
103            }
104        }
105    }
106
107    /// This function removes the oldest connected peer, to keep the connections fresh.
108    /// This function only triggers if the router is above the minimum number of connected peers.
109    fn remove_oldest_connected_peer(&self) {
110        // Skip if the router is at or below the minimum number of connected peers.
111        if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
112            return;
113        }
114
115        // Skip if the node is not requesting peers.
116        if !self.router().allow_external_peers() {
117            return;
118        }
119
120        // Retrieve the trusted peers.
121        let trusted = self.router().trusted_peers();
122        // Retrieve the bootstrap peers.
123        let bootstrap = self.router().bootstrap_peers();
124
125        // Find the oldest connected peer, that is neither trusted nor a bootstrap peer.
126        let oldest_peer = self
127            .router()
128            .get_connected_peers()
129            .iter()
130            .filter(|peer| !trusted.contains(&peer.ip()) && !bootstrap.contains(&peer.ip()))
131            .filter(|peer| !self.router().cache.contains_inbound_block_request(&peer.ip())) // Skip if the peer is syncing.
132            .filter(|peer| self.is_block_synced() || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) // Skip if you are syncing from this peer.
133            .min_by_key(|peer| peer.last_seen())
134            .map(|peer| peer.ip());
135
136        // Disconnect from the oldest connected peer, if one exists.
137        if let Some(oldest) = oldest_peer {
138            info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
139            let _ = self.send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
140            // Disconnect from this peer.
141            self.router().disconnect(oldest);
142        }
143    }
144
145    /// TODO (howardwu): If the node is a validator, keep the validator.
146    /// This function keeps the number of connected peers within the allowed range.
147    fn handle_connected_peers(&self) {
148        // Initialize an RNG.
149        let rng = &mut OsRng;
150
151        // Obtain the number of connected peers.
152        let num_connected = self.router().number_of_connected_peers();
153        // Obtain the number of connected provers.
154        let num_connected_provers = self.router().number_of_connected_provers();
155
156        // Consider rotating more external peers every ~10 heartbeats.
157        let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
158        // Determine the maximum number of peers and provers to keep.
159        let (max_peers, max_provers) = if reduce_peers {
160            (Self::MEDIAN_NUMBER_OF_PEERS, 0)
161        } else {
162            (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
163        };
164
165        // Compute the number of surplus peers.
166        let num_surplus_peers = num_connected.saturating_sub(max_peers);
167        // Compute the number of surplus provers.
168        let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
169        // Compute the number of provers remaining connected.
170        let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
171        // Compute the number of surplus clients and validators.
172        let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
173
174        if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
175            debug!(
176                "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
177            );
178
179            // Retrieve the trusted peers.
180            let trusted = self.router().trusted_peers();
181            // Retrieve the bootstrap peers.
182            let bootstrap = self.router().bootstrap_peers();
183
184            // Determine the provers to disconnect from.
185            let prover_ips_to_disconnect = self
186                .router()
187                .connected_provers()
188                .into_iter()
189                .filter(|peer_ip| !trusted.contains(peer_ip) && !bootstrap.contains(peer_ip))
190                .choose_multiple(rng, num_surplus_provers);
191
192            // TODO (howardwu): As a validator, prioritize disconnecting from clients.
193            //  Remove RNG, pick the `n` oldest nodes.
194            // Determine the clients and validators to disconnect from.
195            let peer_ips_to_disconnect = self
196                .router()
197                .get_connected_peers()
198                .into_iter()
199                .filter_map(|peer| {
200                    let peer_ip = peer.ip();
201                    if !peer.is_prover() && // Skip if the peer is a prover.
202                       !trusted.contains(&peer_ip) && // Skip if the peer is trusted.
203                       !bootstrap.contains(&peer_ip) && // Skip if the peer is a bootstrap peer.
204                       // Skip if you are syncing from this peer.
205                       (self.is_block_synced() || (!self.is_block_synced() && self.router().cache.num_outbound_block_requests(&peer.ip()) == 0))
206                    {
207                        Some(peer_ip)
208                    } else {
209                        None
210                    }
211                })
212                .choose_multiple(rng, num_surplus_clients_validators);
213
214            // Proceed to send disconnect requests to these peers.
215            for peer_ip in peer_ips_to_disconnect.into_iter().chain(prover_ips_to_disconnect) {
216                // TODO (howardwu): Remove this after specializing this function.
217                if self.router().node_type().is_prover() {
218                    if let Some(peer) = self.router().get_connected_peer(&peer_ip) {
219                        if peer.node_type().is_validator() {
220                            continue;
221                        }
222                    }
223                }
224
225                info!("Disconnecting from '{peer_ip}' (exceeded maximum connections)");
226                self.send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
227                // Disconnect from this peer.
228                self.router().disconnect(peer_ip);
229            }
230        }
231
232        // Obtain the number of connected peers.
233        let num_connected = self.router().number_of_connected_peers();
234        // Compute the number of deficit peers.
235        let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
236
237        if num_deficient > 0 {
238            // Initialize an RNG.
239            let rng = &mut OsRng;
240
241            // Attempt to connect to more peers.
242            for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
243                self.router().connect(peer_ip);
244            }
245
246            if self.router().allow_external_peers() {
247                // Request more peers from the connected peers.
248                for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
249                    self.send(peer_ip, Message::PeerRequest(PeerRequest));
250                }
251            }
252        }
253    }
254
255    /// This function keeps the number of bootstrap peers within the allowed range.
256    fn handle_bootstrap_peers(&self) {
257        // Split the bootstrap peers into connected and candidate lists.
258        let mut connected_bootstrap = Vec::new();
259        let mut candidate_bootstrap = Vec::new();
260        for bootstrap_ip in self.router().bootstrap_peers() {
261            match self.router().is_connected(&bootstrap_ip) {
262                true => connected_bootstrap.push(bootstrap_ip),
263                false => candidate_bootstrap.push(bootstrap_ip),
264            }
265        }
266        // If there are not enough connected bootstrap peers, connect to more.
267        if connected_bootstrap.is_empty() {
268            // Initialize an RNG.
269            let rng = &mut OsRng;
270            // Attempt to connect to a bootstrap peer.
271            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
272                self.router().connect(peer_ip);
273            }
274        }
275        // Determine if the node is connected to more bootstrap peers than allowed.
276        let num_surplus = connected_bootstrap.len().saturating_sub(1);
277        if num_surplus > 0 {
278            // Initialize an RNG.
279            let rng = &mut OsRng;
280            // Proceed to send disconnect requests to these bootstrap peers.
281            for peer_ip in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
282                info!("Disconnecting from '{peer_ip}' (exceeded maximum bootstrap)");
283                self.send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
284                // Disconnect from this peer.
285                self.router().disconnect(peer_ip);
286            }
287        }
288    }
289
290    /// This function attempts to connect to any disconnected trusted peers.
291    fn handle_trusted_peers(&self) {
292        // Ensure that the trusted nodes are connected.
293        for peer_ip in self.router().trusted_peers() {
294            // If the peer is not connected, attempt to connect to it.
295            if !self.router().is_connected(peer_ip) {
296                // Attempt to connect to the trusted peer.
297                self.router().connect(*peer_ip);
298            }
299        }
300    }
301
302    /// This function updates the puzzle if network has updated.
303    fn handle_puzzle_request(&self) {
304        // No-op
305    }
306
307    // Remove addresses whose ban time has expired.
308    fn handle_banned_ips(&self) {
309        self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
310    }
311}