Skip to main content

snarkos_node_bft/
gateway.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19    CONTEXT,
20    MAX_BATCH_DELAY,
21    MEMORY_POOL_PORT,
22    Worker,
23    events::{DisconnectReason, EventCodec, PrimaryPing},
24    helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25    spawn_blocking,
26};
27use smol_str::SmolStr;
28use snarkos_account::Account;
29use snarkos_node_bft_events::{
30    BlockRequest,
31    BlockResponse,
32    CertificateRequest,
33    CertificateResponse,
34    ChallengeRequest,
35    ChallengeResponse,
36    DataBlocks,
37    Event,
38    EventTrait,
39    TransmissionRequest,
40    TransmissionResponse,
41    ValidatorsRequest,
42    ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_network::{
46    ConnectionMode,
47    NodeType,
48    Peer,
49    PeerPoolHandling,
50    Resolver,
51    bootstrap_peers,
52    get_repo_commit_hash,
53    log_repo_sha_comparison,
54    shorten_snarkos_sha,
55};
56use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
57use snarkos_node_tcp::{
58    Config,
59    ConnectError,
60    Connection,
61    ConnectionSide,
62    P2P,
63    Tcp,
64    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
65};
66use snarkos_utilities::NodeDataDir;
67use snarkvm::{
68    console::prelude::*,
69    ledger::{
70        committee::Committee,
71        narwhal::{BatchHeader, Data},
72    },
73    prelude::{Address, Field},
74    utilities::flatten_error,
75};
76
77use colored::Colorize;
78use futures::{SinkExt, future::join_all};
79use indexmap::IndexMap;
80#[cfg(feature = "locktick")]
81use locktick::parking_lot::{Mutex, RwLock};
82#[cfg(not(feature = "locktick"))]
83use parking_lot::{Mutex, RwLock};
84use rand::seq::{IteratorRandom, SliceRandom};
85use std::{
86    collections::{HashMap, HashSet},
87    future::Future,
88    io,
89    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
90    sync::Arc,
91    time::Duration,
92};
93use tokio::{
94    net::TcpStream,
95    sync::{OnceCell, oneshot},
96    task::{self, JoinHandle},
97};
98use tokio_stream::StreamExt;
99use tokio_util::codec::Framed;
100
101/// The maximum interval of events to cache.
102const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
103/// The maximum interval of requests to cache.
104const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
105
106/// The maximum number of connection attempts in an interval.
107#[cfg(not(test))]
108const MAX_CONNECTION_ATTEMPTS: usize = 10;
109
110/// The maximum number of validators to send in a validators response event.
111pub const MAX_VALIDATORS_TO_SEND: usize = 200;
112
113/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
114#[cfg(not(test))]
115const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
116
117/// The amount of time an IP address is prohibited from connecting.
118const IP_BAN_TIME_IN_SECS: u64 = 300;
119
120/// Part of the Gateway API that deals with networking.
121/// This is a separate trait to allow for easier testing/mocking.
122#[async_trait]
123pub trait Transport<N: Network>: Send + Sync {
124    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
125    fn broadcast(&self, event: Event<N>);
126}
127
128/// The gateway maintains connections to other validators.
129/// For connections with clients and provers, the Router logic is used.
130#[derive(Clone)]
131pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
132
133impl<N: Network> Deref for Gateway<N> {
134    type Target = Arc<InnerGateway<N>>;
135
136    fn deref(&self) -> &Self::Target {
137        &self.0
138    }
139}
140
141pub struct InnerGateway<N: Network> {
142    /// The account of the node.
143    account: Account<N>,
144    /// The storage.
145    storage: Storage<N>,
146    /// The ledger service.
147    ledger: Arc<dyn LedgerService<N>>,
148    /// The TCP stack.
149    tcp: Tcp,
150    /// The cache.
151    cache: Cache<N>,
152    /// The resolver.
153    resolver: RwLock<Resolver<N>>,
154    /// The collection of both candidate and connected peers.
155    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
156    #[cfg(feature = "telemetry")]
157    validator_telemetry: Telemetry<N>,
158    /// The primary sender.
159    primary_sender: OnceCell<PrimarySender<N>>,
160    /// The worker senders.
161    worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
162    /// The sync sender.
163    sync_sender: OnceCell<SyncSender<N>>,
164    /// The spawned handles.
165    handles: Mutex<Vec<JoinHandle<()>>>,
166    /// The storage mode.
167    node_data_dir: NodeDataDir,
168    /// If the flag is set, the node will only connect to trusted peers.
169    trusted_peers_only: bool,
170    /// The development mode.
171    dev: Option<u16>,
172}
173
174impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
175    const MAXIMUM_POOL_SIZE: usize = 200;
176    const OWNER: &str = CONTEXT;
177    const PEER_SLASHING_COUNT: usize = 20;
178
179    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
180        &self.peer_pool
181    }
182
183    fn resolver(&self) -> &RwLock<Resolver<N>> {
184        &self.resolver
185    }
186
187    fn is_dev(&self) -> bool {
188        self.dev.is_some()
189    }
190
191    fn trusted_peers_only(&self) -> bool {
192        self.trusted_peers_only
193    }
194
195    fn node_type(&self) -> NodeType {
196        NodeType::Validator
197    }
198}
199
200impl<N: Network> Gateway<N> {
201    /// Initializes a new gateway.
202    #[allow(clippy::too_many_arguments)]
203    pub fn new(
204        account: Account<N>,
205        storage: Storage<N>,
206        ledger: Arc<dyn LedgerService<N>>,
207        ip: Option<SocketAddr>,
208        trusted_validators: &[SocketAddr],
209        trusted_peers_only: bool,
210        node_data_dir: NodeDataDir,
211        dev: Option<u16>,
212    ) -> Result<Self> {
213        // Initialize the gateway IP.
214        let ip = match (ip, dev) {
215            (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
216            (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
217            (Some(ip), _) => ip,
218        };
219        // Initialize the TCP stack.
220        //
221        // The 10x multiplier allows for more TCP connections than the maximum
222        // committee size to prevent "connection refused" errors when two nodes
223        // simultaneous attempt to connect to each other. Note, that later,
224        // during handshake, the Gateway applies its own limit to the number of
225        // active connections and removes duplicates.
226        let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size() * 10));
227
228        // Prepare the collection of the initial peers.
229        let mut initial_peers = HashMap::new();
230
231        // Load entries from the validator cache (if present and if we are not in trusted peers only mode).
232        if !trusted_peers_only {
233            let cached_peers = Self::load_cached_peers(&node_data_dir.gateway_peer_cache_path())?;
234            for addr in cached_peers {
235                initial_peers.insert(addr, Peer::new_candidate(addr, false));
236            }
237        }
238
239        // Add the trusted peers to the list of the initial peers; this may promote
240        // some of the cached validators to trusted ones.
241        initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
242
243        // Return the gateway.
244        Ok(Self(Arc::new(InnerGateway {
245            account,
246            storage,
247            ledger,
248            tcp,
249            cache: Default::default(),
250            resolver: Default::default(),
251            peer_pool: RwLock::new(initial_peers),
252            #[cfg(feature = "telemetry")]
253            validator_telemetry: Default::default(),
254            primary_sender: Default::default(),
255            worker_senders: Default::default(),
256            sync_sender: Default::default(),
257            handles: Default::default(),
258            node_data_dir,
259            trusted_peers_only,
260            dev,
261        })))
262    }
263
264    /// Run the gateway.
265    pub async fn run(
266        &self,
267        primary_sender: PrimarySender<N>,
268        worker_senders: IndexMap<u8, WorkerSender<N>>,
269        sync_sender: Option<SyncSender<N>>,
270    ) {
271        debug!("Starting the gateway for the memory pool...");
272
273        // Set the primary sender.
274        self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
275
276        // Set the worker senders.
277        self.worker_senders.set(worker_senders).expect("The worker senders are already set");
278
279        // If the sync sender was provided, set the sync sender.
280        if let Some(sync_sender) = sync_sender {
281            self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
282        }
283
284        // Enable the TCP protocols.
285        self.enable_handshake().await;
286        self.enable_reading().await;
287        self.enable_writing().await;
288        self.enable_disconnect().await;
289        self.enable_on_connect().await;
290
291        // Spawn a loop for periodic metrics.
292        #[cfg(feature = "metrics")]
293        {
294            let gateway = self.clone();
295            self.spawn(async move {
296                loop {
297                    tokio::time::sleep(Duration::from_secs(1)).await;
298                    gateway.update_metrics();
299                }
300            });
301        }
302
303        // Enable the TCP listener. Note: This must be called after the above protocols.
304        let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
305        debug!("Listening for validator connections at address {listen_addr:?}");
306
307        // Initialize the heartbeat.
308        self.initialize_heartbeat();
309
310        info!("Started the gateway for the memory pool at '{}'", self.local_ip());
311    }
312}
313
314// Dynamic rate limiting.
315impl<N: Network> Gateway<N> {
316    /// The current maximum committee size.
317    fn max_committee_size(&self) -> usize {
318        self.ledger
319            .current_committee()
320            .map_or_else(|_e| Committee::<N>::max_committee_size() as usize, |committee| committee.num_members())
321    }
322
323    /// The maximum number of events to cache.
324    fn max_cache_events(&self) -> usize {
325        self.max_cache_transmissions()
326    }
327
328    /// The maximum number of certificate requests to cache.
329    fn max_cache_certificates(&self) -> usize {
330        2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
331    }
332
333    /// The maximum number of transmission requests to cache.
334    fn max_cache_transmissions(&self) -> usize {
335        self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
336    }
337
338    /// The maximum number of duplicates for any particular request.
339    fn max_cache_duplicates(&self) -> usize {
340        self.max_committee_size().pow(2)
341    }
342}
343
344#[async_trait]
345impl<N: Network> CommunicationService for Gateway<N> {
346    /// The message type.
347    type Message = Event<N>;
348
349    /// Prepares a block request to be sent.
350    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
351        debug_assert!(start_height < end_height, "Invalid block request format");
352        Event::BlockRequest(BlockRequest { start_height, end_height })
353    }
354
355    /// Sends the given message to specified peer.
356    ///
357    /// This function returns as soon as the message is queued to be sent,
358    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
359    /// which can be used to determine when and whether the message has been delivered.
360    async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
361        Transport::send(self, peer_ip, message).await
362    }
363}
364
365impl<N: Network> Gateway<N> {
366    /// Returns the account of the node.
367    pub fn account(&self) -> &Account<N> {
368        &self.account
369    }
370
371    /// Returns the dev identifier of the node.
372    pub fn dev(&self) -> Option<u16> {
373        self.dev
374    }
375
376    /// Returns a reference to the ledger.
377    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
378        &self.ledger
379    }
380
381    /// Returns the resolver.
382    pub fn resolver(&self) -> &RwLock<Resolver<N>> {
383        &self.resolver
384    }
385
386    /// Returns the listener IP address from the (ambiguous) peer address.
387    pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
388        self.resolver.read().get_listener(*connected_addr)
389    }
390
391    /// Returns the validator telemetry.
392    #[cfg(feature = "telemetry")]
393    pub fn validator_telemetry(&self) -> &Telemetry<N> {
394        &self.validator_telemetry
395    }
396
397    /// Returns the primary sender.
398    pub fn primary_sender(&self) -> &PrimarySender<N> {
399        self.primary_sender.get().expect("Primary sender not set in gateway")
400    }
401
402    /// Returns the number of workers.
403    pub fn num_workers(&self) -> u8 {
404        u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
405            .expect("Too many workers")
406    }
407
408    /// Returns the worker sender for the given worker ID.
409    pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
410        self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
411    }
412
413    /// Returns `true` if the given peer IP is an authorized validator.
414    pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
415        // If the peer IP is in the trusted validators, return early.
416        if self.trusted_peers().contains(&ip) {
417            return true;
418        }
419        // Retrieve the Aleo address of the peer IP.
420        match self.resolve_to_aleo_addr(ip) {
421            // Determine if the peer IP is an authorized validator.
422            Some(address) => self.is_authorized_validator_address(address),
423            None => {
424                warn!("{CONTEXT} Could not resolve the Aleo address for '{ip}'");
425                false
426            }
427        }
428    }
429
430    /// Returns `true` if the given address is an authorized validator.
431    pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
432        // Determine if the validator address is a member of the committee lookback,
433        // the current committee, or the previous committee lookbacks.
434        // We allow leniency in this validation check in order to accommodate these two scenarios:
435        //  1. New validators should be able to connect immediately once bonded as a committee member.
436        //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
437        //     (i.e. meaning they must stay online until the next block has been produced)
438
439        // Determine if the validator is in the current committee with lookback.
440        if self
441            .ledger
442            .get_committee_lookback_for_round(self.storage.current_round())
443            .is_ok_and(|committee| committee.is_committee_member(validator_address))
444        {
445            return true;
446        }
447
448        // Determine if the validator is in the latest committee on the ledger.
449        if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
450            return true;
451        }
452
453        // Retrieve the previous block height to consider from the sync tolerance.
454        let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
455        // Determine if the validator is in any of the previous committee lookbacks.
456        match self.ledger.get_block_round(previous_block_height) {
457            Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
458                self.ledger
459                    .get_committee_lookback_for_round(round)
460                    .is_ok_and(|committee| committee.is_committee_member(validator_address))
461            }),
462            Err(_) => false,
463        }
464    }
465
466    /// Returns the list of connected addresses.
467    pub fn connected_addresses(&self) -> HashSet<Address<N>> {
468        self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
469    }
470
471    /// Ensure the peer is allowed to connect.
472    fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
473        // Ensure the peer IP is not this node.
474        if self.is_local_ip(listener_addr) {
475            return Err(DisconnectReason::SelfConnect);
476        }
477
478        Ok(())
479    }
480
481    /// Updates the connection metrics for the gateway. Ignores the bootstrap clients.
482    #[cfg(feature = "metrics")]
483    fn update_metrics(&self) {
484        if let Some(count) = self.number_of_connected_validators() {
485            metrics::gauge(metrics::bft::CONNECTED, count as f64);
486        }
487        if let Some(count) = self.number_of_connecting_peers() {
488            metrics::gauge(metrics::bft::CONNECTING, count as f64);
489        }
490    }
491
492    /// Inserts the given peer into the connected peers. This is only used in testing.
493    #[cfg(test)]
494    pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
495        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
496        self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
497        // Add a transmission for this peer in the connected peers.
498        self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
499        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
500            peer.upgrade_to_connected(
501                peer_addr,
502                peer_ip.port(),
503                address,
504                NodeType::Validator,
505                0,
506                get_repo_commit_hash(),
507                ConnectionMode::Gateway,
508            );
509        }
510    }
511
512    /// Sends the given event to specified peer.
513    ///
514    /// This function returns as soon as the event is queued to be sent,
515    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
516    /// which can be used to determine when and whether the event has been delivered.
517    fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
518        // Resolve the listener IP to the (ambiguous) peer address.
519        let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
520            warn!("Unable to resolve the listener IP address '{peer_ip}'");
521            return None;
522        };
523        // Retrieve the event name.
524        let name = event.name();
525        // Send the event to the peer.
526        trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
527        let result = self.unicast(peer_addr, event);
528        // If the event was unable to be sent, disconnect.
529        if let Err(err) = &result {
530            warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {err:?}");
531            debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
532            self.disconnect(peer_ip);
533        }
534        result.ok()
535    }
536
537    /// Handles the inbound event from the peer. The returned value indicates whether
538    /// the connection is still active, and errors cause a disconnect once they are
539    /// propagated to the caller.
540    async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
541        // Retrieve the listener IP for the peer.
542        let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
543            // No longer connected to the peer.
544            trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
545            return Ok(false);
546        };
547        // Ensure that the peer is an authorized committee member or a bootstrapper.
548        if !(self.is_authorized_validator_ip(peer_ip)
549            || self
550                .get_connected_peer(peer_ip)
551                .map(|peer| peer.node_type == NodeType::BootstrapClient)
552                .unwrap_or(false))
553        {
554            bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
555        }
556        // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
557        let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
558        if num_events >= self.max_cache_events() {
559            bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
560        }
561        // Rate limit for duplicate requests.
562        match event {
563            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
564                // Retrieve the certificate ID.
565                let certificate_id = match &event {
566                    Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
567                    Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
568                    _ => unreachable!(),
569                };
570                // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
571                let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
572                if num_events >= self.max_cache_duplicates() {
573                    return Ok(true);
574                }
575            }
576            Event::TransmissionRequest(TransmissionRequest { transmission_id })
577            | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
578                // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
579                let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
580                if num_events >= self.max_cache_duplicates() {
581                    return Ok(true);
582                }
583            }
584            Event::BlockRequest(_) => {
585                let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
586                if num_events >= self.max_cache_duplicates() {
587                    return Ok(true);
588                }
589            }
590            _ => {}
591        }
592        trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
593
594        // This match statement handles the inbound event by deserializing the event,
595        // checking the event is valid, and then calling the appropriate (trait) handler.
596        match event {
597            Event::BatchPropose(batch_propose) => {
598                // Send the batch propose to the primary.
599                let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
600                Ok(true)
601            }
602            Event::BatchSignature(batch_signature) => {
603                // Send the batch signature to the primary.
604                let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
605                Ok(true)
606            }
607            Event::BatchCertified(batch_certified) => {
608                // Send the batch certificate to the primary.
609                let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
610                Ok(true)
611            }
612            Event::BlockRequest(block_request) => {
613                let BlockRequest { start_height, end_height } = block_request;
614
615                // Ensure the block request is well-formed.
616                if start_height >= end_height {
617                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
618                }
619                // Ensure that the block request is within the allowed bounds.
620                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
621                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
622                }
623
624                // End height is exclusive.
625                let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
626
627                let self_ = self.clone();
628                let blocks = match task::spawn_blocking(move || {
629                    // Retrieve the blocks within the requested range.
630                    match self_.ledger.get_blocks(start_height..end_height) {
631                        Ok(blocks) => Ok(DataBlocks(blocks)),
632                        Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
633                    }
634                })
635                .await
636                {
637                    Ok(Ok(blocks)) => blocks,
638                    Ok(Err(error)) => return Err(error),
639                    Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
640                };
641
642                let self_ = self.clone();
643                tokio::spawn(async move {
644                    // Send the `BlockResponse` message to the peer.
645                    let event =
646                        Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
647                    Transport::send(&self_, peer_ip, event).await;
648                });
649                Ok(true)
650            }
651            Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
652                // Process the block response. Except for some tests, there is always a sync sender.
653                if let Some(sync_sender) = self.sync_sender.get() {
654                    // Check the response corresponds to a request.
655                    if !self.cache.remove_outbound_block_request(peer_ip, &request) {
656                        bail!("Unsolicited block response from '{peer_ip}'")
657                    }
658
659                    // Perform the deferred non-blocking deserialization of the blocks.
660                    // The deserialization can take a long time (minutes). We should not be running
661                    // this on a blocking task, but on a rayon thread pool.
662                    let (send, recv) = tokio::sync::oneshot::channel();
663                    rayon::spawn_fifo(move || {
664                        let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
665                        let _ = send.send(blocks);
666                    });
667                    let blocks = match recv.await {
668                        Ok(Ok(blocks)) => blocks,
669                        Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
670                        Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
671                    };
672
673                    // Ensure the block response is well-formed.
674                    blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
675                    // Send the blocks to the sync module.
676                    match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
677                        Ok(_) => Ok(true),
678                        Err(err) if err.is_benign() => {
679                            let err: anyhow::Error = err.into();
680                            let err = err.context(format!("Ignoring block response from peer '{peer_ip}'"));
681                            debug!("{}", flatten_error(err));
682                            Ok(true)
683                        }
684                        Err(err) if err.is_invalid_consensus_version() => {
685                            let err: anyhow::Error = err.into();
686                            let err = err.context(format!("Peer sent an invalid block response '{peer_ip}'"));
687
688                            let msg = flatten_error(&err);
689                            error!("{msg}");
690                            self.ip_ban_peer(peer_ip, Some(&msg));
691                            Err(err)
692                        }
693                        Err(err) => {
694                            let err: anyhow::Error = err.into();
695                            let err = err.context(format!("Peer '{peer_ip}' sent an invalid block response"));
696                            warn!("{}", flatten_error(err));
697
698                            // TODO(kaimast): This needs more testing to ensure disconnect is the correct action.
699                            Ok(true)
700                        }
701                    }
702                } else {
703                    debug!("Ignoring block response from '{peer_ip}' - no sync sender");
704                    Ok(true)
705                }
706            }
707            Event::CertificateRequest(certificate_request) => {
708                // Send the certificate request to the sync module.
709                // Except for some tests, there is always a sync sender.
710                if let Some(sync_sender) = self.sync_sender.get() {
711                    // Send the certificate request to the sync module.
712                    let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
713                }
714                Ok(true)
715            }
716            Event::CertificateResponse(certificate_response) => {
717                // Send the certificate response to the sync module.
718                // Except for some tests, there is always a sync sender.
719                if let Some(sync_sender) = self.sync_sender.get() {
720                    // Send the certificate response to the sync module.
721                    let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
722                }
723                Ok(true)
724            }
725            Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
726                // Disconnect as the peer is not following the protocol.
727                bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
728            }
729            Event::Disconnect(message) => {
730                // The peer informs us that they had disconnected. Disconnect from them too.
731                debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
732                self.disconnect(peer_ip);
733                Ok(false)
734            }
735            Event::PrimaryPing(ping) => {
736                let PrimaryPing { version, block_locators, primary_certificate } = ping;
737
738                // Ensure the event version is not outdated.
739                if version < Event::<N>::VERSION {
740                    bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
741                }
742
743                // Log the validator's height.
744                debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
745
746                // Update the peer locators. Except for some tests, there is always a sync sender.
747                if let Some(sync_sender) = self.sync_sender.get() {
748                    // Check the block locators are valid, and update the validators in the sync module.
749                    if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
750                        bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
751                    }
752                }
753
754                // Send the batch certificates to the primary.
755                let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
756                Ok(true)
757            }
758            Event::TransmissionRequest(request) => {
759                // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
760                // Determine the worker ID.
761                let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
762                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
763                    return Ok(true);
764                };
765                // Send the transmission request to the worker.
766                if let Some(sender) = self.get_worker_sender(worker_id) {
767                    // Send the transmission request to the worker.
768                    let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
769                }
770                Ok(true)
771            }
772            Event::TransmissionResponse(response) => {
773                // Determine the worker ID.
774                let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
775                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
776                    return Ok(true);
777                };
778                // Send the transmission response to the worker.
779                if let Some(sender) = self.get_worker_sender(worker_id) {
780                    // Send the transmission response to the worker.
781                    let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
782                }
783                Ok(true)
784            }
785            Event::ValidatorsRequest(_) => {
786                let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
787                connected_peers.shuffle(&mut rand::rng());
788
789                let self_ = self.clone();
790                tokio::spawn(async move {
791                    // Initialize the validators.
792                    let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
793                    // Iterate over the validators.
794                    for validator in connected_peers.into_iter() {
795                        // Add the validator to the list of validators.
796                        validators.insert(validator.listener_addr, validator.aleo_addr);
797                    }
798                    // Send the validators response to the peer.
799                    let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
800                    Transport::send(&self_, peer_ip, event).await;
801                });
802                Ok(true)
803            }
804            Event::ValidatorsResponse(response) => {
805                if self.trusted_peers_only {
806                    bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
807                }
808                let ValidatorsResponse { validators } = response;
809                // Ensure the number of validators is not too large.
810                ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
811                // Ensure the cache contains a validators request for this peer.
812                if !self.cache.contains_outbound_validators_request(peer_ip) {
813                    bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
814                }
815                // Decrement the number of validators requests for this peer.
816                self.cache.decrement_outbound_validators_requests(peer_ip);
817
818                // Add valid validators as candidates to the peer pool; only validator-related
819                // filters need to be applied, the rest is handled by `PeerPoolHandling`.
820                let valid_addrs = validators
821                    .into_iter()
822                    .filter_map(|(listener_addr, aleo_addr)| {
823                        (self.account.address() != aleo_addr
824                            && !self.is_connected_address(aleo_addr)
825                            && self.is_authorized_validator_address(aleo_addr))
826                        .then_some((listener_addr, None))
827                    })
828                    .collect::<Vec<_>>();
829                if !valid_addrs.is_empty() {
830                    self.insert_candidate_peers(valid_addrs);
831                }
832
833                Ok(true)
834            }
835            Event::WorkerPing(ping) => {
836                // Ensure the number of transmissions is not too large.
837                ensure!(
838                    ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
839                    "{CONTEXT} Received too many transmissions"
840                );
841                // Retrieve the number of workers.
842                let num_workers = self.num_workers();
843                // Iterate over the transmission IDs.
844                for transmission_id in ping.transmission_ids.into_iter() {
845                    // Determine the worker ID.
846                    let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
847                        warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
848                        continue;
849                    };
850                    // Send the transmission ID to the worker.
851                    if let Some(sender) = self.get_worker_sender(worker_id) {
852                        // Send the transmission ID to the worker.
853                        let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
854                    }
855                }
856                Ok(true)
857            }
858        }
859    }
860
861    /// Initialize a new instance of the heartbeat.
862    fn initialize_heartbeat(&self) {
863        let self_clone = self.clone();
864        self.spawn(async move {
865            // Sleep briefly to ensure the other nodes are ready to connect.
866            tokio::time::sleep(Duration::from_millis(1000)).await;
867            info!("Starting the heartbeat of the gateway...");
868            loop {
869                // Process a heartbeat in the gateway.
870                self_clone.heartbeat().await;
871                // Sleep for the heartbeat interval.
872                tokio::time::sleep(Duration::from_secs(15)).await;
873            }
874        });
875    }
876
877    /// Spawns a task with the given future; it should only be used for long-running tasks.
878    #[allow(dead_code)]
879    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
880        self.handles.lock().push(tokio::spawn(future));
881    }
882
883    /// Shuts down the gateway.
884    pub async fn shut_down(&self) {
885        info!("Shutting down the gateway...");
886        // Save the best peers for future use.
887        if let Err(e) = self.save_best_peers(&self.node_data_dir.gateway_peer_cache_path(), None, true) {
888            warn!("Failed to persist best validators to disk: {e}");
889        }
890        // Abort the tasks.
891        self.handles.lock().iter().for_each(|handle| handle.abort());
892        // Close the listener.
893        self.tcp.shut_down().await;
894    }
895}
896
897impl<N: Network> Gateway<N> {
898    /// The minimum time between connection attempts to a peer.
899    const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
900    /// The uptime after which nodes log a warning about missing validator connections.
901    const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
902
903    /// Handles the heartbeat request.
904    async fn heartbeat(&self) {
905        // Log the connected validators.
906        self.log_connected_validators();
907        // Log the validator participation scores.
908        #[cfg(feature = "telemetry")]
909        self.log_participation_scores();
910        // Keep the trusted validators connected.
911        self.handle_trusted_validators();
912        // Keep the bootstrap peers within the allowed range.
913        self.handle_bootstrap_peers().await;
914        // Removes any validators that not in the current committee.
915        self.handle_unauthorized_validators();
916        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
917        self.handle_min_connected_validators().await;
918        // Unban any addresses whose ban time has expired.
919        self.handle_banned_ips();
920    }
921
922    /// Logs the connected validators.
923    fn log_connected_validators(&self) {
924        // Retrieve the connected validators and current committee.
925        // The gatway may also be connected to bootstrap clients, which we should not log as connected validators.
926        let connected_validators = self.filter_connected_peers(|peer| peer.node_type == NodeType::Validator);
927
928        let committee = match self.ledger.current_committee() {
929            Ok(c) => c,
930            Err(err) => {
931                error!("Failed to get current committee: {err}");
932                return;
933            }
934        };
935
936        // Resolve the total number of connectable validators.
937        let validators_total = committee.num_members().saturating_sub(1);
938        // Format the total validators message.
939        let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
940        // Construct the connections message.
941        let connections_msg = match connected_validators.len() {
942            0 => "No connected validators".to_string(),
943            num_connected => format!("Connected to {num_connected} validators {total_validators}"),
944        };
945        info!("{connections_msg}");
946
947        // Collect the connected validator addresses and stake.
948        let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
949        let mut connected_validator_shas: HashMap<SmolStr, u64> = HashMap::with_capacity(connected_validators.len());
950        // Insert our sha.
951        let our_sha = shorten_snarkos_sha(&get_repo_commit_hash());
952        let our_stake = committee.get_stake(self.account.address());
953        connected_validator_shas.insert(our_sha.clone(), our_stake);
954        // Include our own address.
955        connected_validator_addresses.insert(self.account.address());
956        // Include and log the connected validators.
957        for peer in &connected_validators {
958            // Register the Aleo address.
959            let address = peer.aleo_addr;
960            connected_validator_addresses.insert(address);
961            // Register the snarkOS commit SHA and the associated stake.
962            let address_stake = committee.get_stake(address);
963            let short_peer_sha = shorten_snarkos_sha(&peer.snarkos_sha);
964            *connected_validator_shas.entry(short_peer_sha.clone()).or_default() += address_stake;
965
966            debug!(
967                "{}",
968                format!(
969                    "  Connected to: {} - {} (connection age {:?})",
970                    peer.listener_addr,
971                    peer.aleo_addr,
972                    peer.first_seen.elapsed()
973                )
974                .dimmed()
975            );
976        }
977
978        // Log how much of the stake uses our git commit hash.
979        if let Some(combined_stake) = connected_validator_shas.get(&our_sha) {
980            let percentage = *combined_stake as f64 / committee.total_stake() as f64 * 100.0;
981            debug!("{}", format!("  Combined stake @ {our_sha}: {percentage:.2}%").dimmed());
982            #[cfg(feature = "metrics")]
983            metrics::gauge(metrics::bft::CONNECTED_STAKE_WITH_MATCHING_SHA, percentage);
984        }
985
986        // Log the validators that are not connected.
987        let num_not_connected = validators_total.saturating_sub(connected_validators.len());
988        if num_not_connected > 0 && self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
989            // Cache the total stake for computing percentages.
990            let total_stake = committee.total_stake();
991            let total_stake_f64 = total_stake as f64;
992
993            // Collect the committee members.
994            let committee_members: HashSet<_> =
995                self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
996
997            let not_connected_stake: u64 = committee_members
998                .difference(&connected_validator_addresses)
999                .map(|address| {
1000                    let address_stake = committee.get_stake(*address);
1001                    let address_stake_as_percentage =
1002                        if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
1003                    debug!(
1004                        "{}",
1005                        format!("  Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
1006                            .dimmed()
1007                    );
1008                    address_stake
1009                })
1010                .sum();
1011
1012            let not_connected_stake_as_percentage =
1013                if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
1014            warn!(
1015                "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
1016            );
1017            #[cfg(feature = "metrics")]
1018            {
1019                let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
1020                metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
1021            }
1022        } else {
1023            #[cfg(feature = "metrics")]
1024            metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
1025        };
1026
1027        if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
1028            // Not being connected to a quorum of validators is begning during startup.
1029            if self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
1030                error!("Not connected to a quorum of validators");
1031            } else {
1032                debug!("Not connected to a quorum of validators");
1033            }
1034        }
1035    }
1036
1037    // Logs the validator participation scores.
1038    #[cfg(feature = "telemetry")]
1039    fn log_participation_scores(&self) {
1040        if let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(self.storage.current_round()) {
1041            // Retrieve the participation scores.
1042            let participation_scores = self.validator_telemetry().get_participation_scores(&committee_lookback);
1043
1044            // Log the participation scores.
1045            debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1046            for (address, (cert_score, sig_score)) in participation_scores {
1047                debug!(
1048                    "{}",
1049                    format!("  {address} - certificates: {cert_score:.2}%  signatures: {sig_score:.2}%").dimmed()
1050                );
1051            }
1052        }
1053    }
1054
1055    /// This function attempts to connect to any disconnected trusted validators.
1056    fn handle_trusted_validators(&self) {
1057        let trusted_peers = self.trusted_peers();
1058
1059        // Attempt to re-establish connections with any trusted peer that is not connected already.
1060        let handles: Vec<JoinHandle<_>> = trusted_peers
1061            .iter()
1062            .filter_map(|validator_ip| {
1063                // Attempt to connect to the trusted validator.
1064                match self.connect(*validator_ip) {
1065                    Ok(hdl) => Some(hdl),
1066                    Err(ConnectError::SelfConnect { .. })
1067                    | Err(ConnectError::AlreadyConnected { .. })
1068                    | Err(ConnectError::AlreadyConnecting { .. }) => None,
1069                    Err(err) => {
1070                        warn!("Could not initiate connection to trusted validator at '{validator_ip}' - {err}");
1071                        None
1072                    }
1073                }
1074            })
1075            .collect();
1076
1077        if !handles.is_empty() {
1078            info!("Reconnnecting to {} out of {} trusted validators", handles.len(), trusted_peers.len());
1079        }
1080    }
1081
1082    /// This function keeps the number of bootstrap peers within the allowed range.
1083    async fn handle_bootstrap_peers(&self) {
1084        // Return early if we are in trusted peers only mode.
1085        if self.trusted_peers_only {
1086            return;
1087        }
1088        // Split the bootstrap peers into connected and candidate lists.
1089        let mut candidate_bootstrap = Vec::new();
1090        let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1091        for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1092            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1093                candidate_bootstrap.push(bootstrap_ip);
1094            }
1095        }
1096        // If there are not enough connected bootstrap peers, connect to more.
1097        if connected_bootstrap.is_empty() {
1098            // Sample a random bootstrap peer to connect to (drop rng before any await).
1099            let peer_to_connect = candidate_bootstrap.into_iter().choose(&mut rand::rng());
1100            if let Some(peer_ip) = peer_to_connect {
1101                match self.connect(peer_ip) {
1102                    Ok(hdl) => {
1103                        debug!("{CONTEXT} (Re-)connecting to bootstrap peer at '{peer_ip}'");
1104                        let result = hdl.await;
1105                        if let Err(err) = result {
1106                            warn!("{CONTEXT} Failed to connect to bootstrap peer at '{peer_ip}' - {err}");
1107                        }
1108                    }
1109                    Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
1110                    Err(err) => {
1111                        warn!("{CONTEXT} Could not initiate connection to bootstrap peer at '{peer_ip}' - {err}")
1112                    }
1113                }
1114            }
1115        }
1116        // Determine if the node is connected to more bootstrap peers than allowed.
1117        let num_surplus = connected_bootstrap.len().saturating_sub(1);
1118        if num_surplus > 0 {
1119            // Sample peers to disconnect (drop rng before any await).
1120            let peers_to_disconnect = connected_bootstrap.into_iter().sample(&mut rand::rng(), num_surplus);
1121            for peer in peers_to_disconnect {
1122                info!("{CONTEXT} Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1123                <Self as Transport<N>>::send(
1124                    self,
1125                    peer.listener_addr,
1126                    Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1127                )
1128                .await;
1129                // Disconnect from this peer.
1130                self.disconnect(peer.listener_addr);
1131            }
1132        }
1133    }
1134
1135    /// This function attempts to disconnect any validators that are not in the current committee.
1136    fn handle_unauthorized_validators(&self) {
1137        let self_ = self.clone();
1138        tokio::spawn(async move {
1139            // Retrieve the connected validators.
1140            let validators = self_.get_connected_peers();
1141            // Iterate over the validator IPs.
1142            for peer in validators {
1143                // Skip bootstrapper peers.
1144                if peer.node_type == NodeType::BootstrapClient {
1145                    continue;
1146                }
1147                // Disconnect any validator that is not in the current committee.
1148                if !self_.is_authorized_validator_ip(peer.listener_addr) {
1149                    warn!(
1150                        "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1151                        peer.listener_addr
1152                    );
1153                    Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1154                    // Disconnect from this peer.
1155                    self_.disconnect(peer.listener_addr);
1156                }
1157            }
1158        });
1159    }
1160
1161    /// This function sends a `ValidatorsRequest` to a random validator,
1162    /// if the number of connected validators is less than the minimum.
1163    /// It also attempts to connect to known unconnected validators.
1164    async fn handle_min_connected_validators(&self) {
1165        // Attempt to connect to untrusted validators we're not connected to yet.
1166        // The trusted ones are already handled by `handle_trusted_validators`.
1167        let trusted_validators = self.trusted_peers();
1168        if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES() as usize {
1169            let (addrs, handles): (Vec<_>, Vec<_>) = self
1170                .get_candidate_peers()
1171                .iter()
1172                .filter_map(|peer| {
1173                    if trusted_validators.contains(&peer.listener_addr) {
1174                        return None;
1175                    }
1176
1177                    if let Some(previous_attempt) = peer.last_connection_attempt
1178                        && previous_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
1179                    {
1180                        return None;
1181                    }
1182
1183                    match self.connect(peer.listener_addr) {
1184                        Ok(hdl) => Some((peer.listener_addr, hdl)),
1185                        Err(ConnectError::AlreadyConnected { .. })
1186                        | Err(ConnectError::AlreadyConnecting { .. })
1187                        | Err(ConnectError::SelfConnect { .. }) => None,
1188                        Err(err) => {
1189                            warn!(
1190                                "{CONTEXT} Could not initiate connection to validator at '{}' - {err}",
1191                                peer.listener_addr
1192                            );
1193                            None
1194                        }
1195                    }
1196                })
1197                .unzip();
1198
1199            for (addr, result) in addrs.into_iter().zip(join_all(handles).await) {
1200                if let Err(err) = result {
1201                    warn!("{CONTEXT} Failed to connect to validator at '{addr}' - {err}");
1202                }
1203            }
1204
1205            // Retrieve the connected validators.
1206            let validators = self.connected_peers();
1207            // If there are no validator IPs to connect to, return early.
1208            if validators.is_empty() {
1209                return;
1210            }
1211            // Select a random validator IP.
1212            if let Some(validator_ip) = validators.into_iter().choose(&mut rand::rng()) {
1213                let self_ = self.clone();
1214                tokio::spawn(async move {
1215                    // Increment the number of outbound validators requests for this validator.
1216                    self_.cache.increment_outbound_validators_requests(validator_ip);
1217                    // Send a `ValidatorsRequest` to the validator.
1218                    let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1219                });
1220            }
1221        }
1222    }
1223
1224    /// Processes a message received from the network.
1225    async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1226        // Process the message. Disconnect if the peer violated the protocol.
1227        if let Err(error) = self.inbound(peer_addr, message).await
1228            && let Some(peer_ip) = self.resolver.read().get_listener(peer_addr)
1229        {
1230            warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1231            let self_ = self.clone();
1232            tokio::spawn(async move {
1233                Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1234                // Disconnect from this peer.
1235                self_.disconnect(peer_ip);
1236            });
1237        }
1238    }
1239
1240    // Remove addresses whose ban time has expired.
1241    fn handle_banned_ips(&self) {
1242        self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1243    }
1244}
1245
1246#[async_trait]
1247impl<N: Network> Transport<N> for Gateway<N> {
1248    /// Sends the given event to specified peer.
1249    ///
1250    /// This method is rate limited to prevent spamming the peer.
1251    ///
1252    /// This function returns as soon as the event is queued to be sent,
1253    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1254    /// which can be used to determine when and whether the event has been delivered.
1255    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1256        macro_rules! send {
1257            ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1258                // Rate limit the number of certificate requests sent to the peer.
1259                while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1260                    // Sleep for a short period of time to allow the cache to clear.
1261                    tokio::time::sleep(Duration::from_millis(10)).await;
1262                }
1263                // Send the event to the peer.
1264                $self.send_inner(peer_ip, event)
1265            }};
1266        }
1267
1268        // Increment the cache for certificate, transmission and block events.
1269        match event {
1270            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1271                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1272                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1273                // Send the event to the peer.
1274                send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1275            }
1276            Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1277                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1278                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1279                // Send the event to the peer.
1280                send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1281            }
1282            Event::BlockRequest(request) => {
1283                // Insert the outbound request so we can match it to responses.
1284                self.cache.insert_outbound_block_request(peer_ip, request);
1285                // Send the event to the peer and update the outbound event cache, use the general rate limit.
1286                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1287            }
1288            _ => {
1289                // Send the event to the peer, use the general rate limit.
1290                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1291            }
1292        }
1293    }
1294
1295    /// Broadcasts the given event to all connected peers.
1296    // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1297    // serialized in advance if it's there.
1298    fn broadcast(&self, event: Event<N>) {
1299        // Ensure there are connected peers.
1300        if self.number_of_connected_peers() > 0 {
1301            let self_ = self.clone();
1302            let connected_peers = self.connected_peers();
1303            tokio::spawn(async move {
1304                // Iterate through all connected peers.
1305                for peer_ip in connected_peers {
1306                    // Send the event to the peer.
1307                    let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1308                }
1309            });
1310        }
1311    }
1312}
1313
1314impl<N: Network> P2P for Gateway<N> {
1315    /// Returns a reference to the TCP instance.
1316    fn tcp(&self) -> &Tcp {
1317        &self.tcp
1318    }
1319}
1320
1321#[async_trait]
1322impl<N: Network> Reading for Gateway<N> {
1323    type Codec = EventCodec<N>;
1324    type Message = Event<N>;
1325
1326    /// Creates a [`Decoder`] used to interpret messages from the network.
1327    /// The `side` param indicates the connection side **from the node's perspective**.
1328    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1329        Default::default()
1330    }
1331
1332    /// Processes a message received from the network.
1333    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1334        if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1335            let self_ = self.clone();
1336            // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1337            // inbound queue.
1338            tokio::spawn(async move {
1339                self_.process_message_inner(peer_addr, message).await;
1340            });
1341        } else {
1342            self.process_message_inner(peer_addr, message).await;
1343        }
1344        Ok(())
1345    }
1346
1347    /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1348    /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1349    fn message_queue_depth(&self) -> usize {
1350        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1351            * N::LATEST_MAX_CERTIFICATES() as usize
1352            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1353    }
1354}
1355
1356#[async_trait]
1357impl<N: Network> Writing for Gateway<N> {
1358    type Codec = EventCodec<N>;
1359    type Message = Event<N>;
1360
1361    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1362    /// The `side` parameter indicates the connection side **from the node's perspective**.
1363    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1364        Default::default()
1365    }
1366
1367    /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1368    /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1369    /// (like slow serialization) or network.
1370    fn message_queue_depth(&self) -> usize {
1371        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1372            * N::LATEST_MAX_CERTIFICATES() as usize
1373            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1374    }
1375}
1376
1377#[async_trait]
1378impl<N: Network> Disconnect for Gateway<N> {
1379    /// Any extra operations to be performed during a disconnect.
1380    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1381        if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1382            // TODO(kaimast): This can, in theory, still lead to race conditions, if we immediately reconnect to the same peer.
1383            // In practice, there should always be a significant delay between those two delays, so it is not an immediate issue.
1384            //
1385            // To properly fix this, we either needk hold a lock here, or add a dedicated "disconnecting" state, so that
1386            // a peer is not re-added while the rest of the disconnect logic is running.
1387            let was_fully_connected = self.downgrade_peer_to_candidate(peer_ip);
1388
1389            // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
1390            if was_fully_connected && let Some(sync_sender) = self.sync_sender.get() {
1391                let (tx, rx) = oneshot::channel();
1392
1393                if let Err(err) = sync_sender.tx_block_sync_remove_peer.send((peer_ip, tx)).await {
1394                    let err: anyhow::Error = err.into();
1395                    let err =
1396                        err.context(format!("Unable to remove disconnecting peer '{peer_ip}' from the sync module"));
1397                    warn!("{CONTEXT} {}", flatten_error(err));
1398                }
1399
1400                if let Err(err) = rx.await {
1401                    let err: anyhow::Error = err.into();
1402                    let err =
1403                        err.context(format!("Unable to remove disconnecting peer '{peer_ip}' from the sync module"));
1404                    warn!("{CONTEXT} {}", flatten_error(err));
1405                }
1406            }
1407            // We don't clear this map based on time but only on peer disconnect.
1408            // This is sufficient to avoid infinite growth as the committee has a fixed number
1409            // of members.
1410            self.cache.clear_outbound_validators_requests(peer_ip);
1411            self.cache.clear_outbound_block_requests(peer_ip);
1412        } else {
1413            warn!("{CONTEXT} Got disconnect for a peer '{peer_addr}' that is not in the peer pool");
1414        }
1415    }
1416}
1417
1418#[async_trait]
1419impl<N: Network> OnConnect for Gateway<N> {
1420    async fn on_connect(&self, peer_addr: SocketAddr) {
1421        if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1422            if let Some(peer) = self.get_connected_peer(listener_addr) {
1423                if peer.node_type == NodeType::BootstrapClient {
1424                    self.cache.increment_outbound_validators_requests(listener_addr);
1425                    let _ =
1426                        <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1427                            .await;
1428                }
1429            }
1430        }
1431    }
1432}
1433
1434#[async_trait]
1435impl<N: Network> Handshake for Gateway<N> {
1436    /// Performs the handshake protocol.
1437    async fn perform_handshake(&self, mut connection: Connection) -> Result<Connection, ConnectError> {
1438        // Perform the handshake.
1439        let peer_addr = connection.addr();
1440        let peer_side = connection.side();
1441
1442        // Check (or impose) IP-level bans.
1443        #[cfg(not(test))]
1444        if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1445            // If the IP is already banned reject the connection.
1446            if self.is_ip_banned(peer_addr.ip()) {
1447                trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1448                return Err(ConnectError::BannedIp { ip: peer_addr.ip() });
1449            }
1450
1451            let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1452
1453            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1454            if num_attempts > MAX_CONNECTION_ATTEMPTS {
1455                self.update_ip_ban(peer_addr.ip());
1456                trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1457                return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
1458            }
1459        }
1460
1461        let stream = self.borrow_stream(&mut connection);
1462
1463        // If this is an inbound connection, we log it, but don't know the listening address yet.
1464        // Otherwise, we can immediately register the listening address.
1465        let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1466            debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1467            None
1468        } else {
1469            debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1470            Some(peer_addr)
1471        };
1472
1473        // Retrieve the restrictions ID.
1474        let restrictions_id = self.ledger.latest_restrictions_id();
1475
1476        // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1477        let handshake_result = if peer_side == ConnectionSide::Responder {
1478            self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1479        } else {
1480            self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1481        };
1482
1483        if let Some(addr) = listener_addr {
1484            match handshake_result {
1485                Ok(ref cr) => {
1486                    let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1487                        NodeType::BootstrapClient
1488                    } else {
1489                        NodeType::Validator
1490                    };
1491
1492                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1493                        self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1494                        peer.upgrade_to_connected(
1495                            peer_addr,
1496                            cr.listener_port,
1497                            cr.address,
1498                            node_type,
1499                            cr.version,
1500                            cr.snarkos_sha,
1501                            ConnectionMode::Gateway,
1502                        );
1503                    }
1504                    info!("{CONTEXT} Connected to '{addr}'");
1505                }
1506                Err(error) => {
1507                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1508                        // The peer may only be downgraded if it's a ConnectingPeer.
1509                        if peer.is_connecting() {
1510                            peer.downgrade_to_candidate(addr);
1511                        }
1512                    }
1513                    return Err(error);
1514                }
1515            }
1516        }
1517
1518        Ok(connection)
1519    }
1520}
1521
1522/// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1523macro_rules! expect_event {
1524    ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1525        match $framed.try_next().await? {
1526            // Received the expected event, proceed.
1527            Some($event_ty(data)) => {
1528                trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1529                data
1530            }
1531            // Received a disconnect event, abort.
1532            Some(Event::Disconnect($crate::events::Disconnect { reason })) => {
1533                return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
1534            }
1535            // Received an unexpected event, abort.
1536            Some(ty) => {
1537                return Err(ConnectError::other(format!(
1538                    "'{}' did not follow the handshake protocol: received {:?} instead of {}",
1539                    $peer_addr,
1540                    ty.name(),
1541                    stringify!($msg_ty),
1542                )));
1543            }
1544            // Received nothing.
1545            None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
1546        }
1547    };
1548}
1549
1550/// Send the given message to the peer.
1551async fn send_event<N: Network>(
1552    framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1553    peer_addr: SocketAddr,
1554    event: Event<N>,
1555) -> io::Result<()> {
1556    trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1557    framed.send(event).await
1558}
1559
1560impl<N: Network> Gateway<N> {
1561    /// The connection initiator side of the handshake.
1562    async fn handshake_inner_initiator<'a>(
1563        &'a self,
1564        peer_addr: SocketAddr,
1565        restrictions_id: Field<N>,
1566        stream: &'a mut TcpStream,
1567    ) -> Result<ChallengeRequest<N>, ConnectError> {
1568        // Introduce the peer into the peer pool.
1569        self.add_connecting_peer(peer_addr)?;
1570
1571        // Construct the stream.
1572        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1573
1574        /* Step 1: Send the challenge request. */
1575
1576        // Sample a random nonce.
1577        let our_nonce: u64 = rand::random();
1578        // Determine the snarkOS SHA to send to the peer.
1579        let current_block_height = self.ledger.latest_block_height();
1580        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1581        let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1582            (true, _, Some(sha)) => Some(sha),
1583            (_, true, Some(sha)) => Some(sha),
1584            _ => None,
1585        };
1586        // Send a challenge request to the peer.
1587        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1588        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1589
1590        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1591
1592        // Listen for the challenge response message.
1593        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1594        // Listen for the challenge request message.
1595        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1596
1597        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1598        if let Some(reason) = self
1599            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1600            .await
1601        {
1602            send_event(&mut framed, peer_addr, reason.into()).await?;
1603            return Err(ConnectError::application(reason));
1604        }
1605
1606        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1607        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1608            send_event(&mut framed, peer_addr, reason.into()).await?;
1609            return Err(reason.into_connect_error(peer_addr));
1610        }
1611
1612        /* Step 3: Send the challenge response. */
1613
1614        // Sign the counterparty nonce.
1615        let response_nonce: u64 = rand::random();
1616        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1617        let Ok(our_signature) = self.account.sign_bytes(&data, &mut rand::rng()) else {
1618            return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1619        };
1620        // Send the challenge response.
1621        let our_response =
1622            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1623        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1624
1625        Ok(peer_request)
1626    }
1627
1628    /// The connection responder side of the handshake.
1629    async fn handshake_inner_responder<'a>(
1630        &'a self,
1631        peer_addr: SocketAddr,
1632        peer_ip: &mut Option<SocketAddr>,
1633        restrictions_id: Field<N>,
1634        stream: &'a mut TcpStream,
1635    ) -> Result<ChallengeRequest<N>, ConnectError> {
1636        // Construct the stream.
1637        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1638
1639        /* Step 1: Receive the challenge request. */
1640
1641        // Listen for the challenge request message.
1642        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1643
1644        // Ensure the address is not the same as this node.
1645        if self.account.address() == peer_request.address {
1646            return Err(ConnectError::SelfConnect { address: peer_addr });
1647        }
1648
1649        // Obtain the peer's listening address.
1650        *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1651        let peer_ip = peer_ip.unwrap();
1652
1653        // Knowing the peer's listening address, ensure it is allowed to connect.
1654        if let Err(reason) = self.ensure_peer_is_allowed(peer_ip) {
1655            send_event(&mut framed, peer_addr, reason.into()).await?;
1656            return Err(reason.into_connect_error(peer_addr));
1657        }
1658
1659        // Introduce the peer into the peer pool.
1660        self.add_connecting_peer(peer_ip)?;
1661
1662        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1663        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1664            send_event(&mut framed, peer_addr, reason.into()).await?;
1665            return Err(reason.into_connect_error(peer_addr));
1666        }
1667
1668        /* Step 2: Send the challenge response followed by own challenge request. */
1669
1670        // Sign the counterparty nonce.
1671        let response_nonce: u64 = rand::random();
1672        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1673        let Ok(our_signature) = self.account.sign_bytes(&data, &mut rand::rng()) else {
1674            return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1675        };
1676        // Send the challenge response.
1677        let our_response =
1678            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1679        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1680
1681        // Sample a random nonce.
1682        let our_nonce: u64 = rand::random();
1683        // Determine the snarkOS SHA to send to the peer.
1684        let current_block_height = self.ledger.latest_block_height();
1685        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1686        let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1687            (true, _, Some(sha)) => Some(sha),
1688            (_, true, Some(sha)) => Some(sha),
1689            _ => None,
1690        };
1691        // Send the challenge request.
1692        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1693        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1694
1695        /* Step 3: Receive the challenge response. */
1696
1697        // Listen for the challenge response message.
1698        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1699        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1700        if let Some(reason) = self
1701            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1702            .await
1703        {
1704            send_event(&mut framed, peer_addr, reason.into()).await?;
1705            Err(reason.into_connect_error(peer_addr))
1706        } else {
1707            Ok(peer_request)
1708        }
1709    }
1710
1711    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1712    #[must_use]
1713    fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1714        // Retrieve the components of the challenge request.
1715        let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1716        log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
1717
1718        let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1719
1720        // Ensure the event protocol version is not outdated.
1721        if version < Event::<N>::VERSION {
1722            return Some(DisconnectReason::OutdatedClientVersion);
1723        }
1724        // If the node is in trusted peers only mode, ensure the peer is trusted.
1725        if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1726            warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1727            return Some(DisconnectReason::NoExternalPeersAllowed);
1728        }
1729        if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1730            // Ensure the address is a current committee member.
1731            if !self.is_authorized_validator_address(address) {
1732                return Some(DisconnectReason::UnauthorizedValidator);
1733            }
1734        }
1735
1736        // Ensure the address is not already connected.
1737        if self.is_connected_address(address) {
1738            return Some(DisconnectReason::AlreadyConnectedToAleoAddress);
1739        }
1740
1741        None
1742    }
1743
1744    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1745    #[must_use]
1746    async fn verify_challenge_response(
1747        &self,
1748        peer_addr: SocketAddr,
1749        peer_address: Address<N>,
1750        response: ChallengeResponse<N>,
1751        expected_restrictions_id: Field<N>,
1752        expected_nonce: u64,
1753    ) -> Option<DisconnectReason> {
1754        // Retrieve the components of the challenge response.
1755        let ChallengeResponse { restrictions_id, signature, nonce } = response;
1756
1757        // Verify the restrictions ID.
1758        if restrictions_id != expected_restrictions_id {
1759            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1760            return Some(DisconnectReason::InvalidChallengeResponse);
1761        }
1762        // Perform the deferred non-blocking deserialization of the signature.
1763        let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1764            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1765            return Some(DisconnectReason::InvalidChallengeResponse);
1766        };
1767        // Verify the signature.
1768        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1769            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1770            return Some(DisconnectReason::InvalidChallengeResponse);
1771        }
1772        None
1773    }
1774}
1775
1776#[cfg(test)]
1777mod prop_tests {
1778    use crate::{
1779        Gateway,
1780        MAX_WORKERS,
1781        MEMORY_POOL_PORT,
1782        Worker,
1783        helpers::{Storage, init_primary_channels, init_worker_channels},
1784    };
1785
1786    use snarkos_account::Account;
1787    use snarkos_node_bft_ledger_service::MockLedgerService;
1788    use snarkos_node_bft_storage_service::BFTMemoryService;
1789    use snarkos_node_network::PeerPoolHandling;
1790    use snarkos_node_tcp::P2P;
1791    use snarkos_utilities::NodeDataDir;
1792
1793    use snarkos_node_bft_events::committee_prop_tests::{CommitteeContext, ValidatorSet};
1794    use snarkvm::{
1795        ledger::{
1796            committee::{Committee, test_helpers::sample_committee_for_round_and_members},
1797            narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1798        },
1799        prelude::{MainnetV0, PrivateKey},
1800        utilities::TestRng,
1801    };
1802
1803    use indexmap::{IndexMap, IndexSet};
1804    use proptest::{
1805        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1806        sample::Selector,
1807    };
1808    use std::{
1809        fmt::{Debug, Formatter},
1810        net::{IpAddr, Ipv4Addr, SocketAddr},
1811        sync::Arc,
1812    };
1813    use test_strategy::proptest;
1814
1815    type CurrentNetwork = MainnetV0;
1816
1817    impl Debug for Gateway<CurrentNetwork> {
1818        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1819            // TODO implement Debug properly and move it over to production code
1820            f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1821        }
1822    }
1823
1824    #[derive(Debug, test_strategy::Arbitrary)]
1825    enum GatewayAddress {
1826        Dev(u8),
1827        Prod(Option<SocketAddr>),
1828    }
1829
1830    impl GatewayAddress {
1831        fn ip(&self) -> Option<SocketAddr> {
1832            if let GatewayAddress::Prod(ip) = self {
1833                return *ip;
1834            }
1835            None
1836        }
1837
1838        fn port(&self) -> Option<u16> {
1839            if let GatewayAddress::Dev(port) = self {
1840                return Some(*port as u16);
1841            }
1842            None
1843        }
1844    }
1845
1846    impl Arbitrary for Gateway<CurrentNetwork> {
1847        type Parameters = ();
1848        type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1849
1850        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1851            any_valid_dev_gateway()
1852                .prop_map(|(storage, _, private_key, address)| {
1853                    Gateway::new(
1854                        Account::try_from(private_key).unwrap(),
1855                        storage.clone(),
1856                        storage.ledger().clone(),
1857                        address.ip(),
1858                        &[],
1859                        false,
1860                        NodeDataDir::new_test(None),
1861                        address.port(),
1862                    )
1863                    .unwrap()
1864                })
1865                .boxed()
1866        }
1867    }
1868
1869    type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1870
1871    fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1872        (any::<CommitteeContext>(), any::<Selector>())
1873            .prop_flat_map(|(context, account_selector)| {
1874                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1875                (
1876                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1877                    Just(context),
1878                    Just(account_selector.select(validators)),
1879                    0u8..,
1880                )
1881                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Dev(d)))
1882            })
1883            .boxed()
1884    }
1885
1886    fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1887        (any::<CommitteeContext>(), any::<Selector>())
1888            .prop_flat_map(|(context, account_selector)| {
1889                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1890                (
1891                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1892                    Just(context),
1893                    Just(account_selector.select(validators)),
1894                    any::<Option<SocketAddr>>(),
1895                )
1896                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Prod(d)))
1897            })
1898            .boxed()
1899    }
1900
1901    #[proptest]
1902    fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1903        let (storage, _, private_key, dev) = input;
1904        let account = Account::try_from(private_key).unwrap();
1905
1906        let gateway = Gateway::new(
1907            account.clone(),
1908            storage.clone(),
1909            storage.ledger().clone(),
1910            dev.ip(),
1911            &[],
1912            false,
1913            NodeDataDir::new_test(None),
1914            dev.port(),
1915        )
1916        .unwrap();
1917        let tcp_config = gateway.tcp().config();
1918        assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1919        assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1920
1921        let tcp_config = gateway.tcp().config();
1922        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1923        assert_eq!(gateway.account().address(), account.address());
1924    }
1925
1926    #[proptest]
1927    fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1928        let (storage, _, private_key, dev) = input;
1929        let account = Account::try_from(private_key).unwrap();
1930
1931        let gateway = Gateway::new(
1932            account.clone(),
1933            storage.clone(),
1934            storage.ledger().clone(),
1935            dev.ip(),
1936            &[],
1937            false,
1938            NodeDataDir::new_test(None),
1939            dev.port(),
1940        )
1941        .unwrap();
1942        let tcp_config = gateway.tcp().config();
1943        if let Some(socket_addr) = dev.ip() {
1944            assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1945            assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1946        } else {
1947            assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1948            assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1949        }
1950
1951        let tcp_config = gateway.tcp().config();
1952        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1953        assert_eq!(gateway.account().address(), account.address());
1954    }
1955
1956    #[proptest(async = "tokio")]
1957    async fn gateway_start(
1958        #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1959        #[strategy(0..MAX_WORKERS)] workers_count: u8,
1960    ) {
1961        let (storage, committee, private_key, dev) = input;
1962        let committee = committee.0;
1963        let worker_storage = storage.clone();
1964        let account = Account::try_from(private_key).unwrap();
1965
1966        let gateway = Gateway::new(
1967            account,
1968            storage.clone(),
1969            storage.ledger().clone(),
1970            dev.ip(),
1971            &[],
1972            false,
1973            NodeDataDir::new_test(None),
1974            dev.port(),
1975        )
1976        .unwrap();
1977
1978        let (primary_sender, _) = init_primary_channels();
1979
1980        let (workers, worker_senders) = {
1981            // Construct a map of the worker senders.
1982            let mut tx_workers = IndexMap::new();
1983            let mut workers = IndexMap::new();
1984
1985            // Initialize the workers.
1986            for id in 0..workers_count {
1987                // Construct the worker channels.
1988                let (tx_worker, rx_worker) = init_worker_channels();
1989                // Construct the worker instance.
1990                let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1991                let worker =
1992                    Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1993                        .unwrap();
1994                // Run the worker instance.
1995                worker.run(rx_worker);
1996
1997                // Add the worker and the worker sender to maps
1998                workers.insert(id, worker);
1999                tx_workers.insert(id, tx_worker);
2000            }
2001            (workers, tx_workers)
2002        };
2003
2004        gateway.run(primary_sender, worker_senders, None).await;
2005        assert_eq!(
2006            gateway.local_ip(),
2007            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
2008        );
2009        assert_eq!(gateway.num_workers(), workers.len() as u8);
2010    }
2011
2012    #[proptest]
2013    fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
2014        let rng = &mut TestRng::default();
2015
2016        // Initialize the round parameters.
2017        let current_round = 2;
2018        let committee_size = 4;
2019        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
2020        let (_, _, private_key, dev) = input;
2021        let account = Account::try_from(private_key).unwrap();
2022
2023        // Sample the certificates.
2024        let mut certificates = IndexSet::new();
2025        for _ in 0..committee_size {
2026            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2027        }
2028        let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
2029        // Initialize the committee.
2030        let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
2031        // Sample extra certificates from non-committee members.
2032        for _ in 0..committee_size {
2033            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2034        }
2035        // Initialize the ledger.
2036        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2037        // Initialize the storage.
2038        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
2039        // Initialize the gateway.
2040        let gateway = Gateway::new(
2041            account.clone(),
2042            storage.clone(),
2043            ledger.clone(),
2044            dev.ip(),
2045            &[],
2046            false,
2047            NodeDataDir::new_test(None),
2048            dev.port(),
2049        )
2050        .unwrap();
2051        // Insert certificate to the storage.
2052        for certificate in certificates.iter() {
2053            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2054        }
2055        // Check that the current committee members are authorized validators.
2056        for i in 0..certificates.clone().len() {
2057            let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
2058            if i < committee_size {
2059                assert!(is_authorized);
2060            } else {
2061                assert!(!is_authorized);
2062            }
2063        }
2064    }
2065}