Skip to main content

snarkos_node_bft/
gateway.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "telemetry")]
17use crate::helpers::Telemetry;
18use crate::{
19    CONTEXT,
20    MAX_BATCH_DELAY_IN_MS,
21    MEMORY_POOL_PORT,
22    Worker,
23    events::{DisconnectReason, EventCodec, PrimaryPing},
24    helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
25    spawn_blocking,
26};
27use smol_str::SmolStr;
28use snarkos_account::Account;
29use snarkos_node_bft_events::{
30    BlockRequest,
31    BlockResponse,
32    CertificateRequest,
33    CertificateResponse,
34    ChallengeRequest,
35    ChallengeResponse,
36    DataBlocks,
37    Event,
38    EventTrait,
39    TransmissionRequest,
40    TransmissionResponse,
41    ValidatorsRequest,
42    ValidatorsResponse,
43};
44use snarkos_node_bft_ledger_service::LedgerService;
45use snarkos_node_network::{
46    ConnectionMode,
47    NodeType,
48    Peer,
49    PeerPoolHandling,
50    Resolver,
51    bootstrap_peers,
52    get_repo_commit_hash,
53    log_repo_sha_comparison,
54    shorten_snarkos_sha,
55};
56use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
57use snarkos_node_tcp::{
58    Config,
59    ConnectError,
60    Connection,
61    ConnectionSide,
62    P2P,
63    Tcp,
64    protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
65};
66use snarkos_utilities::NodeDataDir;
67use snarkvm::{
68    console::prelude::*,
69    ledger::{
70        committee::Committee,
71        narwhal::{BatchHeader, Data},
72    },
73    prelude::{Address, Field},
74    utilities::flatten_error,
75};
76
77use colored::Colorize;
78use futures::{SinkExt, future::join_all};
79use indexmap::IndexMap;
80#[cfg(feature = "locktick")]
81use locktick::parking_lot::{Mutex, RwLock};
82#[cfg(not(feature = "locktick"))]
83use parking_lot::{Mutex, RwLock};
84use rand::{
85    rngs::OsRng,
86    seq::{IteratorRandom, SliceRandom},
87};
88use std::{
89    collections::{HashMap, HashSet},
90    future::Future,
91    io,
92    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
93    sync::Arc,
94    time::Duration,
95};
96use tokio::{
97    net::TcpStream,
98    sync::{OnceCell, oneshot},
99    task::{self, JoinHandle},
100};
101use tokio_stream::StreamExt;
102use tokio_util::codec::Framed;
103
104/// The maximum interval of events to cache.
105const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
106/// The maximum interval of requests to cache.
107const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
108
109/// The maximum number of connection attempts in an interval.
110#[cfg(not(test))]
111const MAX_CONNECTION_ATTEMPTS: usize = 10;
112
113/// The maximum number of validators to send in a validators response event.
114pub const MAX_VALIDATORS_TO_SEND: usize = 200;
115
116/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
117#[cfg(not(test))]
118const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
119
120/// The amount of time an IP address is prohibited from connecting.
121const IP_BAN_TIME_IN_SECS: u64 = 300;
122
123/// Part of the Gateway API that deals with networking.
124/// This is a separate trait to allow for easier testing/mocking.
125#[async_trait]
126pub trait Transport<N: Network>: Send + Sync {
127    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
128    fn broadcast(&self, event: Event<N>);
129}
130
131/// The gateway maintains connections to other validators.
132/// For connections with clients and provers, the Router logic is used.
133#[derive(Clone)]
134pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);
135
136impl<N: Network> Deref for Gateway<N> {
137    type Target = Arc<InnerGateway<N>>;
138
139    fn deref(&self) -> &Self::Target {
140        &self.0
141    }
142}
143
144pub struct InnerGateway<N: Network> {
145    /// The account of the node.
146    account: Account<N>,
147    /// The storage.
148    storage: Storage<N>,
149    /// The ledger service.
150    ledger: Arc<dyn LedgerService<N>>,
151    /// The TCP stack.
152    tcp: Tcp,
153    /// The cache.
154    cache: Cache<N>,
155    /// The resolver.
156    resolver: RwLock<Resolver<N>>,
157    /// The collection of both candidate and connected peers.
158    peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
159    #[cfg(feature = "telemetry")]
160    validator_telemetry: Telemetry<N>,
161    /// The primary sender.
162    primary_sender: OnceCell<PrimarySender<N>>,
163    /// The worker senders.
164    worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
165    /// The sync sender.
166    sync_sender: OnceCell<SyncSender<N>>,
167    /// The spawned handles.
168    handles: Mutex<Vec<JoinHandle<()>>>,
169    /// The storage mode.
170    node_data_dir: NodeDataDir,
171    /// If the flag is set, the node will only connect to trusted peers.
172    trusted_peers_only: bool,
173    /// The development mode.
174    dev: Option<u16>,
175}
176
177impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
178    const MAXIMUM_POOL_SIZE: usize = 200;
179    const OWNER: &str = CONTEXT;
180    const PEER_SLASHING_COUNT: usize = 20;
181
182    fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
183        &self.peer_pool
184    }
185
186    fn resolver(&self) -> &RwLock<Resolver<N>> {
187        &self.resolver
188    }
189
190    fn is_dev(&self) -> bool {
191        self.dev.is_some()
192    }
193
194    fn trusted_peers_only(&self) -> bool {
195        self.trusted_peers_only
196    }
197
198    fn node_type(&self) -> NodeType {
199        NodeType::Validator
200    }
201}
202
203impl<N: Network> Gateway<N> {
204    /// Initializes a new gateway.
205    #[allow(clippy::too_many_arguments)]
206    pub fn new(
207        account: Account<N>,
208        storage: Storage<N>,
209        ledger: Arc<dyn LedgerService<N>>,
210        ip: Option<SocketAddr>,
211        trusted_validators: &[SocketAddr],
212        trusted_peers_only: bool,
213        node_data_dir: NodeDataDir,
214        dev: Option<u16>,
215    ) -> Result<Self> {
216        // Initialize the gateway IP.
217        let ip = match (ip, dev) {
218            (None, Some(dev)) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, MEMORY_POOL_PORT + dev)),
219            (None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
220            (Some(ip), _) => ip,
221        };
222        // Initialize the TCP stack.
223        //
224        // The 10x multiplier allows for more TCP connections than the maximum
225        // committee size to prevent "connection refused" errors when two nodes
226        // simultaneous attempt to connect to each other. Note, that later,
227        // during handshake, the Gateway applies its own limit to the number of
228        // active connections and removes duplicates.
229        let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size() * 10));
230
231        // Prepare the collection of the initial peers.
232        let mut initial_peers = HashMap::new();
233
234        // Load entries from the validator cache (if present and if we are not in trusted peers only mode).
235        if !trusted_peers_only {
236            let cached_peers = Self::load_cached_peers(&node_data_dir.gateway_peer_cache_path())?;
237            for addr in cached_peers {
238                initial_peers.insert(addr, Peer::new_candidate(addr, false));
239            }
240        }
241
242        // Add the trusted peers to the list of the initial peers; this may promote
243        // some of the cached validators to trusted ones.
244        initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));
245
246        // Return the gateway.
247        Ok(Self(Arc::new(InnerGateway {
248            account,
249            storage,
250            ledger,
251            tcp,
252            cache: Default::default(),
253            resolver: Default::default(),
254            peer_pool: RwLock::new(initial_peers),
255            #[cfg(feature = "telemetry")]
256            validator_telemetry: Default::default(),
257            primary_sender: Default::default(),
258            worker_senders: Default::default(),
259            sync_sender: Default::default(),
260            handles: Default::default(),
261            node_data_dir,
262            trusted_peers_only,
263            dev,
264        })))
265    }
266
267    /// Run the gateway.
268    pub async fn run(
269        &self,
270        primary_sender: PrimarySender<N>,
271        worker_senders: IndexMap<u8, WorkerSender<N>>,
272        sync_sender: Option<SyncSender<N>>,
273    ) {
274        debug!("Starting the gateway for the memory pool...");
275
276        // Set the primary sender.
277        self.primary_sender.set(primary_sender).expect("Primary sender already set in gateway");
278
279        // Set the worker senders.
280        self.worker_senders.set(worker_senders).expect("The worker senders are already set");
281
282        // If the sync sender was provided, set the sync sender.
283        if let Some(sync_sender) = sync_sender {
284            self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
285        }
286
287        // Enable the TCP protocols.
288        self.enable_handshake().await;
289        self.enable_reading().await;
290        self.enable_writing().await;
291        self.enable_disconnect().await;
292        self.enable_on_connect().await;
293
294        // Spawn a loop for periodic metrics.
295        #[cfg(feature = "metrics")]
296        {
297            let gateway = self.clone();
298            self.spawn(async move {
299                loop {
300                    tokio::time::sleep(Duration::from_secs(1)).await;
301                    gateway.update_metrics();
302                }
303            });
304        }
305
306        // Enable the TCP listener. Note: This must be called after the above protocols.
307        let listen_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener");
308        debug!("Listening for validator connections at address {listen_addr:?}");
309
310        // Initialize the heartbeat.
311        self.initialize_heartbeat();
312
313        info!("Started the gateway for the memory pool at '{}'", self.local_ip());
314    }
315}
316
317// Dynamic rate limiting.
318impl<N: Network> Gateway<N> {
319    /// The current maximum committee size.
320    fn max_committee_size(&self) -> usize {
321        self.ledger
322            .current_committee()
323            .map_or_else(|_e| Committee::<N>::max_committee_size() as usize, |committee| committee.num_members())
324    }
325
326    /// The maximum number of events to cache.
327    fn max_cache_events(&self) -> usize {
328        self.max_cache_transmissions()
329    }
330
331    /// The maximum number of certificate requests to cache.
332    fn max_cache_certificates(&self) -> usize {
333        2 * BatchHeader::<N>::MAX_GC_ROUNDS * self.max_committee_size()
334    }
335
336    /// The maximum number of transmission requests to cache.
337    fn max_cache_transmissions(&self) -> usize {
338        self.max_cache_certificates() * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
339    }
340
341    /// The maximum number of duplicates for any particular request.
342    fn max_cache_duplicates(&self) -> usize {
343        self.max_committee_size().pow(2)
344    }
345}
346
347#[async_trait]
348impl<N: Network> CommunicationService for Gateway<N> {
349    /// The message type.
350    type Message = Event<N>;
351
352    /// Prepares a block request to be sent.
353    fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
354        debug_assert!(start_height < end_height, "Invalid block request format");
355        Event::BlockRequest(BlockRequest { start_height, end_height })
356    }
357
358    /// Sends the given message to specified peer.
359    ///
360    /// This function returns as soon as the message is queued to be sent,
361    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
362    /// which can be used to determine when and whether the message has been delivered.
363    async fn send(&self, peer_ip: SocketAddr, message: Self::Message) -> Option<oneshot::Receiver<io::Result<()>>> {
364        Transport::send(self, peer_ip, message).await
365    }
366}
367
368impl<N: Network> Gateway<N> {
369    /// Returns the account of the node.
370    pub fn account(&self) -> &Account<N> {
371        &self.account
372    }
373
374    /// Returns the dev identifier of the node.
375    pub fn dev(&self) -> Option<u16> {
376        self.dev
377    }
378
379    /// Returns the resolver.
380    pub fn resolver(&self) -> &RwLock<Resolver<N>> {
381        &self.resolver
382    }
383
384    /// Returns the listener IP address from the (ambiguous) peer address.
385    pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
386        self.resolver.read().get_listener(*connected_addr)
387    }
388
389    /// Returns the validator telemetry.
390    #[cfg(feature = "telemetry")]
391    pub fn validator_telemetry(&self) -> &Telemetry<N> {
392        &self.validator_telemetry
393    }
394
395    /// Returns the primary sender.
396    pub fn primary_sender(&self) -> &PrimarySender<N> {
397        self.primary_sender.get().expect("Primary sender not set in gateway")
398    }
399
400    /// Returns the number of workers.
401    pub fn num_workers(&self) -> u8 {
402        u8::try_from(self.worker_senders.get().expect("Missing worker senders in gateway").len())
403            .expect("Too many workers")
404    }
405
406    /// Returns the worker sender for the given worker ID.
407    pub fn get_worker_sender(&self, worker_id: u8) -> Option<&WorkerSender<N>> {
408        self.worker_senders.get().and_then(|senders| senders.get(&worker_id))
409    }
410
411    /// Returns `true` if the given peer IP is an authorized validator.
412    pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool {
413        // If the peer IP is in the trusted validators, return early.
414        if self.trusted_peers().contains(&ip) {
415            return true;
416        }
417        // Retrieve the Aleo address of the peer IP.
418        match self.resolve_to_aleo_addr(ip) {
419            // Determine if the peer IP is an authorized validator.
420            Some(address) => self.is_authorized_validator_address(address),
421            None => {
422                warn!("{CONTEXT} Could not resolve the Aleo address for '{ip}'");
423                false
424            }
425        }
426    }
427
428    /// Returns `true` if the given address is an authorized validator.
429    pub fn is_authorized_validator_address(&self, validator_address: Address<N>) -> bool {
430        // Determine if the validator address is a member of the committee lookback,
431        // the current committee, or the previous committee lookbacks.
432        // We allow leniency in this validation check in order to accommodate these two scenarios:
433        //  1. New validators should be able to connect immediately once bonded as a committee member.
434        //  2. Existing validators must remain connected until they are no longer bonded as a committee member.
435        //     (i.e. meaning they must stay online until the next block has been produced)
436
437        // Determine if the validator is in the current committee with lookback.
438        if self
439            .ledger
440            .get_committee_lookback_for_round(self.storage.current_round())
441            .is_ok_and(|committee| committee.is_committee_member(validator_address))
442        {
443            return true;
444        }
445
446        // Determine if the validator is in the latest committee on the ledger.
447        if self.ledger.current_committee().is_ok_and(|committee| committee.is_committee_member(validator_address)) {
448            return true;
449        }
450
451        // Retrieve the previous block height to consider from the sync tolerance.
452        let previous_block_height = self.ledger.latest_block_height().saturating_sub(MAX_BLOCKS_BEHIND);
453        // Determine if the validator is in any of the previous committee lookbacks.
454        match self.ledger.get_block_round(previous_block_height) {
455            Ok(block_round) => (block_round..self.storage.current_round()).step_by(2).any(|round| {
456                self.ledger
457                    .get_committee_lookback_for_round(round)
458                    .is_ok_and(|committee| committee.is_committee_member(validator_address))
459            }),
460            Err(_) => false,
461        }
462    }
463
464    /// Returns the list of connected addresses.
465    pub fn connected_addresses(&self) -> HashSet<Address<N>> {
466        self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect()
467    }
468
469    /// Ensure the peer is allowed to connect.
470    fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<(), DisconnectReason> {
471        // Ensure the peer IP is not this node.
472        if self.is_local_ip(listener_addr) {
473            return Err(DisconnectReason::SelfConnect);
474        }
475
476        Ok(())
477    }
478
479    /// Updates the connection metrics for the gateway.
480    #[cfg(feature = "metrics")]
481    fn update_metrics(&self) {
482        metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64);
483        metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64);
484    }
485
486    /// Inserts the given peer into the connected peers. This is only used in testing.
487    #[cfg(test)]
488    pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
489        // Adds a bidirectional map between the listener address and (ambiguous) peer address.
490        self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address));
491        // Add a transmission for this peer in the connected peers.
492        self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false));
493        if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
494            peer.upgrade_to_connected(
495                peer_addr,
496                peer_ip.port(),
497                address,
498                NodeType::Validator,
499                0,
500                get_repo_commit_hash(),
501                ConnectionMode::Gateway,
502            );
503        }
504    }
505
506    /// Sends the given event to specified peer.
507    ///
508    /// This function returns as soon as the event is queued to be sent,
509    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
510    /// which can be used to determine when and whether the event has been delivered.
511    fn send_inner(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
512        // Resolve the listener IP to the (ambiguous) peer address.
513        let Some(peer_addr) = self.resolve_to_ambiguous(peer_ip) else {
514            warn!("Unable to resolve the listener IP address '{peer_ip}'");
515            return None;
516        };
517        // Retrieve the event name.
518        let name = event.name();
519        // Send the event to the peer.
520        trace!("{CONTEXT} Sending '{name}' to '{peer_ip}'");
521        let result = self.unicast(peer_addr, event);
522        // If the event was unable to be sent, disconnect.
523        if let Err(err) = &result {
524            warn!("{CONTEXT} Failed to send '{name}' to '{peer_ip}': {err:?}");
525            debug!("{CONTEXT} Disconnecting from '{peer_ip}' (unable to send)");
526            self.disconnect(peer_ip);
527        }
528        result.ok()
529    }
530
531    /// Handles the inbound event from the peer. The returned value indicates whether
532    /// the connection is still active, and errors cause a disconnect once they are
533    /// propagated to the caller.
534    async fn inbound(&self, peer_addr: SocketAddr, event: Event<N>) -> Result<bool> {
535        // Retrieve the listener IP for the peer.
536        let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else {
537            // No longer connected to the peer.
538            trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name());
539            return Ok(false);
540        };
541        // Ensure that the peer is an authorized committee member or a bootstrapper.
542        if !(self.is_authorized_validator_ip(peer_ip)
543            || self
544                .get_connected_peer(peer_ip)
545                .map(|peer| peer.node_type == NodeType::BootstrapClient)
546                .unwrap_or(false))
547        {
548            bail!("{CONTEXT} Dropping '{}' from '{peer_ip}' (not authorized)", event.name())
549        }
550        // Drop the peer, if they have exceeded the rate limit (i.e. they are requesting too much from us).
551        let num_events = self.cache.insert_inbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
552        if num_events >= self.max_cache_events() {
553            bail!("Dropping '{peer_ip}' for spamming events (num_events = {num_events})")
554        }
555        // Rate limit for duplicate requests.
556        match event {
557            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
558                // Retrieve the certificate ID.
559                let certificate_id = match &event {
560                    Event::CertificateRequest(CertificateRequest { certificate_id }) => *certificate_id,
561                    Event::CertificateResponse(CertificateResponse { certificate }) => certificate.id(),
562                    _ => unreachable!(),
563                };
564                // Skip processing this certificate if the rate limit was exceed (i.e. someone is spamming a specific certificate).
565                let num_events = self.cache.insert_inbound_certificate(certificate_id, CACHE_REQUESTS_INTERVAL);
566                if num_events >= self.max_cache_duplicates() {
567                    return Ok(true);
568                }
569            }
570            Event::TransmissionRequest(TransmissionRequest { transmission_id })
571            | Event::TransmissionResponse(TransmissionResponse { transmission_id, .. }) => {
572                // Skip processing this certificate if the rate limit was exceeded (i.e. someone is spamming a specific certificate).
573                let num_events = self.cache.insert_inbound_transmission(transmission_id, CACHE_REQUESTS_INTERVAL);
574                if num_events >= self.max_cache_duplicates() {
575                    return Ok(true);
576                }
577            }
578            Event::BlockRequest(_) => {
579                let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
580                if num_events >= self.max_cache_duplicates() {
581                    return Ok(true);
582                }
583            }
584            _ => {}
585        }
586        trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());
587
588        // This match statement handles the inbound event by deserializing the event,
589        // checking the event is valid, and then calling the appropriate (trait) handler.
590        match event {
591            Event::BatchPropose(batch_propose) => {
592                // Send the batch propose to the primary.
593                let _ = self.primary_sender().tx_batch_propose.send((peer_ip, batch_propose)).await;
594                Ok(true)
595            }
596            Event::BatchSignature(batch_signature) => {
597                // Send the batch signature to the primary.
598                let _ = self.primary_sender().tx_batch_signature.send((peer_ip, batch_signature)).await;
599                Ok(true)
600            }
601            Event::BatchCertified(batch_certified) => {
602                // Send the batch certificate to the primary.
603                let _ = self.primary_sender().tx_batch_certified.send((peer_ip, batch_certified.certificate)).await;
604                Ok(true)
605            }
606            Event::BlockRequest(block_request) => {
607                let BlockRequest { start_height, end_height } = block_request;
608
609                // Ensure the block request is well-formed.
610                if start_height >= end_height {
611                    bail!("Block request from '{peer_ip}' has an invalid range ({start_height}..{end_height})")
612                }
613                // Ensure that the block request is within the allowed bounds.
614                if end_height - start_height > DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32 {
615                    bail!("Block request from '{peer_ip}' has an excessive range ({start_height}..{end_height})")
616                }
617
618                // End height is exclusive.
619                let latest_consensus_version = N::CONSENSUS_VERSION(end_height - 1)?;
620
621                let self_ = self.clone();
622                let blocks = match task::spawn_blocking(move || {
623                    // Retrieve the blocks within the requested range.
624                    match self_.ledger.get_blocks(start_height..end_height) {
625                        Ok(blocks) => Ok(DataBlocks(blocks)),
626                        Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
627                    }
628                })
629                .await
630                {
631                    Ok(Ok(blocks)) => blocks,
632                    Ok(Err(error)) => return Err(error),
633                    Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
634                };
635
636                let self_ = self.clone();
637                tokio::spawn(async move {
638                    // Send the `BlockResponse` message to the peer.
639                    let event =
640                        Event::BlockResponse(BlockResponse::new(block_request, blocks, latest_consensus_version));
641                    Transport::send(&self_, peer_ip, event).await;
642                });
643                Ok(true)
644            }
645            Event::BlockResponse(BlockResponse { request, latest_consensus_version, blocks, .. }) => {
646                // Process the block response. Except for some tests, there is always a sync sender.
647                if let Some(sync_sender) = self.sync_sender.get() {
648                    // Check the response corresponds to a request.
649                    if !self.cache.remove_outbound_block_request(peer_ip, &request) {
650                        bail!("Unsolicited block response from '{peer_ip}'")
651                    }
652
653                    // Perform the deferred non-blocking deserialization of the blocks.
654                    // The deserialization can take a long time (minutes). We should not be running
655                    // this on a blocking task, but on a rayon thread pool.
656                    let (send, recv) = tokio::sync::oneshot::channel();
657                    rayon::spawn_fifo(move || {
658                        let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
659                        let _ = send.send(blocks);
660                    });
661                    let blocks = match recv.await {
662                        Ok(Ok(blocks)) => blocks,
663                        Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
664                        Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
665                    };
666
667                    // Ensure the block response is well-formed.
668                    blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
669                    // Send the blocks to the sync module.
670                    match sync_sender.insert_block_response(peer_ip, blocks.0, latest_consensus_version).await {
671                        Ok(_) => Ok(true),
672                        Err(err) if err.is_benign() => {
673                            let err: anyhow::Error = err.into();
674                            let err = err.context(format!("Ignoring block response from peer '{peer_ip}'"));
675                            debug!("{}", flatten_error(err));
676                            Ok(true)
677                        }
678                        Err(err) if err.is_invalid_consensus_version() => {
679                            let err: anyhow::Error = err.into();
680                            let err = err.context(format!("Peer sent an invalid block response '{peer_ip}'"));
681
682                            let msg = flatten_error(&err);
683                            error!("{msg}");
684                            self.ip_ban_peer(peer_ip, Some(&msg));
685                            Err(err)
686                        }
687                        Err(err) => {
688                            let err: anyhow::Error = err.into();
689                            let err = err.context(format!("Peer '{peer_ip}' sent an invalid block response"));
690                            warn!("{}", flatten_error(err));
691
692                            // TODO(kaimast): This needs more testing to ensure disconnect is the correct action.
693                            Ok(true)
694                        }
695                    }
696                } else {
697                    debug!("Ignoring block response from '{peer_ip}' - no sync sender");
698                    Ok(true)
699                }
700            }
701            Event::CertificateRequest(certificate_request) => {
702                // Send the certificate request to the sync module.
703                // Except for some tests, there is always a sync sender.
704                if let Some(sync_sender) = self.sync_sender.get() {
705                    // Send the certificate request to the sync module.
706                    let _ = sync_sender.tx_certificate_request.send((peer_ip, certificate_request)).await;
707                }
708                Ok(true)
709            }
710            Event::CertificateResponse(certificate_response) => {
711                // Send the certificate response to the sync module.
712                // Except for some tests, there is always a sync sender.
713                if let Some(sync_sender) = self.sync_sender.get() {
714                    // Send the certificate response to the sync module.
715                    let _ = sync_sender.tx_certificate_response.send((peer_ip, certificate_response)).await;
716                }
717                Ok(true)
718            }
719            Event::ChallengeRequest(..) | Event::ChallengeResponse(..) => {
720                // Disconnect as the peer is not following the protocol.
721                bail!("{CONTEXT} Peer '{peer_ip}' is not following the protocol")
722            }
723            Event::Disconnect(message) => {
724                // The peer informs us that they had disconnected. Disconnect from them too.
725                debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
726                self.disconnect(peer_ip);
727                Ok(false)
728            }
729            Event::PrimaryPing(ping) => {
730                let PrimaryPing { version, block_locators, primary_certificate } = ping;
731
732                // Ensure the event version is not outdated.
733                if version < Event::<N>::VERSION {
734                    bail!("Dropping '{peer_ip}' on event version {version} (outdated)");
735                }
736
737                // Log the validator's height.
738                debug!("Validator '{peer_ip}' is at height {}", block_locators.latest_locator_height());
739
740                // Update the peer locators. Except for some tests, there is always a sync sender.
741                if let Some(sync_sender) = self.sync_sender.get() {
742                    // Check the block locators are valid, and update the validators in the sync module.
743                    if let Err(error) = sync_sender.update_peer_locators(peer_ip, block_locators).await {
744                        bail!("Validator '{peer_ip}' sent invalid block locators - {error}");
745                    }
746                }
747
748                // Send the batch certificates to the primary.
749                let _ = self.primary_sender().tx_primary_ping.send((peer_ip, primary_certificate)).await;
750                Ok(true)
751            }
752            Event::TransmissionRequest(request) => {
753                // TODO (howardwu): Add rate limiting checks on this event, on a per-peer basis.
754                // Determine the worker ID.
755                let Ok(worker_id) = assign_to_worker(request.transmission_id, self.num_workers()) else {
756                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", request.transmission_id);
757                    return Ok(true);
758                };
759                // Send the transmission request to the worker.
760                if let Some(sender) = self.get_worker_sender(worker_id) {
761                    // Send the transmission request to the worker.
762                    let _ = sender.tx_transmission_request.send((peer_ip, request)).await;
763                }
764                Ok(true)
765            }
766            Event::TransmissionResponse(response) => {
767                // Determine the worker ID.
768                let Ok(worker_id) = assign_to_worker(response.transmission_id, self.num_workers()) else {
769                    warn!("{CONTEXT} Unable to assign transmission ID '{}' to a worker", response.transmission_id);
770                    return Ok(true);
771                };
772                // Send the transmission response to the worker.
773                if let Some(sender) = self.get_worker_sender(worker_id) {
774                    // Send the transmission response to the worker.
775                    let _ = sender.tx_transmission_response.send((peer_ip, response)).await;
776                }
777                Ok(true)
778            }
779            Event::ValidatorsRequest(_) => {
780                let mut connected_peers = self.get_best_connected_peers(Some(MAX_VALIDATORS_TO_SEND));
781                connected_peers.shuffle(&mut rand::thread_rng());
782
783                let self_ = self.clone();
784                tokio::spawn(async move {
785                    // Initialize the validators.
786                    let mut validators = IndexMap::with_capacity(MAX_VALIDATORS_TO_SEND);
787                    // Iterate over the validators.
788                    for validator in connected_peers.into_iter() {
789                        // Add the validator to the list of validators.
790                        validators.insert(validator.listener_addr, validator.aleo_addr);
791                    }
792                    // Send the validators response to the peer.
793                    let event = Event::ValidatorsResponse(ValidatorsResponse { validators });
794                    Transport::send(&self_, peer_ip, event).await;
795                });
796                Ok(true)
797            }
798            Event::ValidatorsResponse(response) => {
799                if self.trusted_peers_only {
800                    bail!("{CONTEXT} Not accepting validators response from '{peer_ip}' (trusted peers only)");
801                }
802                let ValidatorsResponse { validators } = response;
803                // Ensure the number of validators is not too large.
804                ensure!(validators.len() <= MAX_VALIDATORS_TO_SEND, "{CONTEXT} Received too many validators");
805                // Ensure the cache contains a validators request for this peer.
806                if !self.cache.contains_outbound_validators_request(peer_ip) {
807                    bail!("{CONTEXT} Received validators response from '{peer_ip}' without a validators request")
808                }
809                // Decrement the number of validators requests for this peer.
810                self.cache.decrement_outbound_validators_requests(peer_ip);
811
812                // Add valid validators as candidates to the peer pool; only validator-related
813                // filters need to be applied, the rest is handled by `PeerPoolHandling`.
814                let valid_addrs = validators
815                    .into_iter()
816                    .filter_map(|(listener_addr, aleo_addr)| {
817                        (self.account.address() != aleo_addr
818                            && !self.is_connected_address(aleo_addr)
819                            && self.is_authorized_validator_address(aleo_addr))
820                        .then_some((listener_addr, None))
821                    })
822                    .collect::<Vec<_>>();
823                if !valid_addrs.is_empty() {
824                    self.insert_candidate_peers(valid_addrs);
825                }
826
827                Ok(true)
828            }
829            Event::WorkerPing(ping) => {
830                // Ensure the number of transmissions is not too large.
831                ensure!(
832                    ping.transmission_ids.len() <= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER_PING,
833                    "{CONTEXT} Received too many transmissions"
834                );
835                // Retrieve the number of workers.
836                let num_workers = self.num_workers();
837                // Iterate over the transmission IDs.
838                for transmission_id in ping.transmission_ids.into_iter() {
839                    // Determine the worker ID.
840                    let Ok(worker_id) = assign_to_worker(transmission_id, num_workers) else {
841                        warn!("{CONTEXT} Unable to assign transmission ID '{transmission_id}' to a worker");
842                        continue;
843                    };
844                    // Send the transmission ID to the worker.
845                    if let Some(sender) = self.get_worker_sender(worker_id) {
846                        // Send the transmission ID to the worker.
847                        let _ = sender.tx_worker_ping.send((peer_ip, transmission_id)).await;
848                    }
849                }
850                Ok(true)
851            }
852        }
853    }
854
855    /// Initialize a new instance of the heartbeat.
856    fn initialize_heartbeat(&self) {
857        let self_clone = self.clone();
858        self.spawn(async move {
859            // Sleep briefly to ensure the other nodes are ready to connect.
860            tokio::time::sleep(Duration::from_millis(1000)).await;
861            info!("Starting the heartbeat of the gateway...");
862            loop {
863                // Process a heartbeat in the gateway.
864                self_clone.heartbeat().await;
865                // Sleep for the heartbeat interval.
866                tokio::time::sleep(Duration::from_secs(15)).await;
867            }
868        });
869    }
870
871    /// Spawns a task with the given future; it should only be used for long-running tasks.
872    #[allow(dead_code)]
873    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
874        self.handles.lock().push(tokio::spawn(future));
875    }
876
877    /// Shuts down the gateway.
878    pub async fn shut_down(&self) {
879        info!("Shutting down the gateway...");
880        // Save the best peers for future use.
881        if let Err(e) = self.save_best_peers(&self.node_data_dir.gateway_peer_cache_path(), None, true) {
882            warn!("Failed to persist best validators to disk: {e}");
883        }
884        // Abort the tasks.
885        self.handles.lock().iter().for_each(|handle| handle.abort());
886        // Close the listener.
887        self.tcp.shut_down().await;
888    }
889}
890
891impl<N: Network> Gateway<N> {
892    /// The minimum time between connection attempts to a peer.
893    const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
894    /// The uptime after which nodes log a warning about missing validator connections.
895    const MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD: Duration = Duration::from_secs(60);
896
897    /// Handles the heartbeat request.
898    async fn heartbeat(&self) {
899        // Log the connected validators.
900        self.log_connected_validators();
901        // Log the validator participation scores.
902        #[cfg(feature = "telemetry")]
903        self.log_participation_scores();
904        // Keep the trusted validators connected.
905        self.handle_trusted_validators();
906        // Keep the bootstrap peers within the allowed range.
907        self.handle_bootstrap_peers().await;
908        // Removes any validators that not in the current committee.
909        self.handle_unauthorized_validators();
910        // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
911        self.handle_min_connected_validators().await;
912        // Unban any addresses whose ban time has expired.
913        self.handle_banned_ips();
914        // Update the dynamic validator whitelist.
915        self.update_validator_whitelist();
916    }
917
918    /// Logs the connected validators.
919    fn log_connected_validators(&self) {
920        // Retrieve the connected validators and current committee.
921        // The gatway may also be connected to bootstrap clients, which we should not log as connected validators.
922        let connected_validators = self.filter_connected_peers(|peer| peer.node_type == NodeType::Validator);
923
924        let committee = match self.ledger.current_committee() {
925            Ok(c) => c,
926            Err(err) => {
927                error!("Failed to get current committee: {err}");
928                return;
929            }
930        };
931
932        // Resolve the total number of connectable validators.
933        let validators_total = committee.num_members().saturating_sub(1);
934        // Format the total validators message.
935        let total_validators = format!("(of {validators_total} bonded validators)").dimmed();
936        // Construct the connections message.
937        let connections_msg = match connected_validators.len() {
938            0 => "No connected validators".to_string(),
939            num_connected => format!("Connected to {num_connected} validators {total_validators}"),
940        };
941        info!("{connections_msg}");
942
943        // Collect the connected validator addresses and stake.
944        let mut connected_validator_addresses = HashSet::with_capacity(connected_validators.len());
945        let mut connected_validator_shas: HashMap<SmolStr, u64> = HashMap::with_capacity(connected_validators.len());
946        // Insert our sha.
947        let our_sha = shorten_snarkos_sha(&get_repo_commit_hash());
948        let our_stake = committee.get_stake(self.account.address());
949        connected_validator_shas.insert(our_sha.clone(), our_stake);
950        // Include our own address.
951        connected_validator_addresses.insert(self.account.address());
952        // Include and log the connected validators.
953        for peer in &connected_validators {
954            // Register the Aleo address.
955            let address = peer.aleo_addr;
956            connected_validator_addresses.insert(address);
957            // Register the snarkOS commit SHA and the associated stake.
958            let address_stake = committee.get_stake(address);
959            let short_peer_sha = shorten_snarkos_sha(&peer.snarkos_sha);
960            *connected_validator_shas.entry(short_peer_sha.clone()).or_default() += address_stake;
961
962            debug!(
963                "{}",
964                format!(
965                    "  Connected to: {} - {} (connection age {:?})",
966                    peer.listener_addr,
967                    peer.aleo_addr,
968                    peer.first_seen.elapsed()
969                )
970                .dimmed()
971            );
972        }
973
974        // Log how much of the stake uses our git commit hash.
975        if let Some(combined_stake) = connected_validator_shas.get(&our_sha) {
976            let percentage = *combined_stake as f64 / committee.total_stake() as f64 * 100.0;
977            debug!("{}", format!("  Combined stake @ {our_sha}: {percentage:.2}%").dimmed());
978            #[cfg(feature = "metrics")]
979            metrics::gauge(metrics::bft::CONNECTED_STAKE_WITH_MATCHING_SHA, percentage);
980        }
981
982        // Log the validators that are not connected.
983        let num_not_connected = validators_total.saturating_sub(connected_validators.len());
984        if num_not_connected > 0 && self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
985            // Cache the total stake for computing percentages.
986            let total_stake = committee.total_stake();
987            let total_stake_f64 = total_stake as f64;
988
989            // Collect the committee members.
990            let committee_members: HashSet<_> =
991                self.ledger.current_committee().map(|c| c.members().keys().copied().collect()).unwrap_or_default();
992
993            let not_connected_stake: u64 = committee_members
994                .difference(&connected_validator_addresses)
995                .map(|address| {
996                    let address_stake = committee.get_stake(*address);
997                    let address_stake_as_percentage =
998                        if total_stake == 0 { 0.0 } else { address_stake as f64 / total_stake_f64 * 100.0 };
999                    debug!(
1000                        "{}",
1001                        format!("  Not connected to {address} ({address_stake_as_percentage:.2}% of total stake)")
1002                            .dimmed()
1003                    );
1004                    address_stake
1005                })
1006                .sum();
1007
1008            let not_connected_stake_as_percentage =
1009                if total_stake == 0 { 0.0 } else { not_connected_stake as f64 / total_stake_f64 * 100.0 };
1010            warn!(
1011                "Not connected to {num_not_connected} validators {total_validators} ({not_connected_stake_as_percentage:.2}% of total stake not connected)"
1012            );
1013            #[cfg(feature = "metrics")]
1014            {
1015                let connected_stake_as_percentage = 100.0 - not_connected_stake_as_percentage;
1016                metrics::gauge(metrics::bft::CONNECTED_STAKE, connected_stake_as_percentage);
1017            }
1018        } else {
1019            #[cfg(feature = "metrics")]
1020            metrics::gauge(metrics::bft::CONNECTED_STAKE, 100.0);
1021        };
1022
1023        if !committee.is_quorum_threshold_reached(&connected_validator_addresses) {
1024            // Not being connected to a quorum of validators is begning during startup.
1025            if self.tcp().uptime() > Self::MISSING_VALIDATOR_CONNECTIONS_GRACE_PERIOD {
1026                error!("Not connected to a quorum of validators");
1027            } else {
1028                debug!("Not connected to a quorum of validators");
1029            }
1030        }
1031    }
1032
1033    // Logs the validator participation scores.
1034    #[cfg(feature = "telemetry")]
1035    fn log_participation_scores(&self) {
1036        if let Ok(current_committee) = self.ledger.current_committee() {
1037            // Retrieve the participation scores.
1038            let participation_scores = self.validator_telemetry().get_participation_scores(&current_committee);
1039            // Log the participation scores.
1040            debug!("Participation Scores (in the last {} rounds):", self.storage.max_gc_rounds());
1041            for (address, score) in participation_scores {
1042                debug!("{}", format!("  {address} - {score:.2}%").dimmed());
1043            }
1044        }
1045    }
1046
1047    /// This function attempts to connect to any disconnected trusted validators.
1048    fn handle_trusted_validators(&self) {
1049        let trusted_peers = self.trusted_peers();
1050
1051        // Attempt to re-establish connections with any trusted peer that is not connected already.
1052        let handles: Vec<JoinHandle<_>> = trusted_peers
1053            .iter()
1054            .filter_map(|validator_ip| {
1055                // Attempt to connect to the trusted validator.
1056                match self.connect(*validator_ip) {
1057                    Ok(hdl) => Some(hdl),
1058                    Err(ConnectError::SelfConnect { .. })
1059                    | Err(ConnectError::AlreadyConnected { .. })
1060                    | Err(ConnectError::AlreadyConnecting { .. }) => None,
1061                    Err(err) => {
1062                        warn!("Could not initiate connection to trusted validator at '{validator_ip}' - {err}");
1063                        None
1064                    }
1065                }
1066            })
1067            .collect();
1068
1069        if !handles.is_empty() {
1070            info!("Reconnnecting to {} out of {} trusted validators", handles.len(), trusted_peers.len());
1071        }
1072    }
1073
1074    /// This function keeps the number of bootstrap peers within the allowed range.
1075    async fn handle_bootstrap_peers(&self) {
1076        // Return early if we are in trusted peers only mode.
1077        if self.trusted_peers_only {
1078            return;
1079        }
1080        // Split the bootstrap peers into connected and candidate lists.
1081        let mut candidate_bootstrap = Vec::new();
1082        let connected_bootstrap = self.filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
1083        for bootstrap_ip in bootstrap_peers::<N>(self.is_dev()) {
1084            if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
1085                candidate_bootstrap.push(bootstrap_ip);
1086            }
1087        }
1088        // If there are not enough connected bootstrap peers, connect to more.
1089        if connected_bootstrap.is_empty() {
1090            // Initialize an RNG.
1091            let rng = &mut OsRng;
1092            // Attempt to connect to a bootstrap peer.
1093            if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
1094                match self.connect(peer_ip) {
1095                    Ok(hdl) => {
1096                        let result = hdl.await;
1097                        if let Err(err) = result {
1098                            warn!("{CONTEXT} Failed to connect to bootstrap peer at '{peer_ip}' - {err}");
1099                        }
1100                    }
1101                    Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
1102                    Err(err) => {
1103                        warn!("{CONTEXT} Could not initiate connection to bootstrap peer at '{peer_ip}' - {err}")
1104                    }
1105                }
1106            }
1107        }
1108        // Determine if the node is connected to more bootstrap peers than allowed.
1109        let num_surplus = connected_bootstrap.len().saturating_sub(1);
1110        if num_surplus > 0 {
1111            // Initialize an RNG.
1112            let rng = &mut OsRng;
1113            // Proceed to send disconnect requests to these bootstrap peers.
1114            for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
1115                info!("{CONTEXT} Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
1116                <Self as Transport<N>>::send(
1117                    self,
1118                    peer.listener_addr,
1119                    Event::Disconnect(DisconnectReason::NoReasonGiven.into()),
1120                )
1121                .await;
1122                // Disconnect from this peer.
1123                self.disconnect(peer.listener_addr);
1124            }
1125        }
1126    }
1127
1128    /// This function attempts to disconnect any validators that are not in the current committee.
1129    fn handle_unauthorized_validators(&self) {
1130        let self_ = self.clone();
1131        tokio::spawn(async move {
1132            // Retrieve the connected validators.
1133            let validators = self_.get_connected_peers();
1134            // Iterate over the validator IPs.
1135            for peer in validators {
1136                // Skip bootstrapper peers.
1137                if peer.node_type == NodeType::BootstrapClient {
1138                    continue;
1139                }
1140                // Disconnect any validator that is not in the current committee.
1141                if !self_.is_authorized_validator_ip(peer.listener_addr) {
1142                    warn!(
1143                        "{CONTEXT} Disconnecting from '{}' - Validator is not in the current committee",
1144                        peer.listener_addr
1145                    );
1146                    Transport::send(&self_, peer.listener_addr, DisconnectReason::ProtocolViolation.into()).await;
1147                    // Disconnect from this peer.
1148                    self_.disconnect(peer.listener_addr);
1149                }
1150            }
1151        });
1152    }
1153
1154    /// This function sends a `ValidatorsRequest` to a random validator,
1155    /// if the number of connected validators is less than the minimum.
1156    /// It also attempts to connect to known unconnected validators.
1157    async fn handle_min_connected_validators(&self) {
1158        // Attempt to connect to untrusted validators we're not connected to yet.
1159        // The trusted ones are already handled by `handle_trusted_validators`.
1160        let trusted_validators = self.trusted_peers();
1161        if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES() as usize {
1162            let (addrs, handles): (Vec<_>, Vec<_>) = self
1163                .get_candidate_peers()
1164                .iter()
1165                .filter_map(|peer| {
1166                    if trusted_validators.contains(&peer.listener_addr) {
1167                        return None;
1168                    }
1169
1170                    if let Some(previous_attempt) = peer.last_connection_attempt
1171                        && previous_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
1172                    {
1173                        return None;
1174                    }
1175
1176                    match self.connect(peer.listener_addr) {
1177                        Ok(hdl) => Some((peer.listener_addr, hdl)),
1178                        Err(ConnectError::AlreadyConnected { .. })
1179                        | Err(ConnectError::AlreadyConnecting { .. })
1180                        | Err(ConnectError::SelfConnect { .. }) => None,
1181                        Err(err) => {
1182                            warn!(
1183                                "{CONTEXT} Could not initiate connection to validator at '{}' - {err}",
1184                                peer.listener_addr
1185                            );
1186                            None
1187                        }
1188                    }
1189                })
1190                .unzip();
1191
1192            for (addr, result) in addrs.into_iter().zip(join_all(handles).await) {
1193                if let Err(err) = result {
1194                    warn!("{CONTEXT} Failed to connect to validator at '{addr}' - {err}");
1195                }
1196            }
1197
1198            // Retrieve the connected validators.
1199            let validators = self.connected_peers();
1200            // If there are no validator IPs to connect to, return early.
1201            if validators.is_empty() {
1202                return;
1203            }
1204            // Select a random validator IP.
1205            if let Some(validator_ip) = validators.into_iter().choose(&mut rand::thread_rng()) {
1206                let self_ = self.clone();
1207                tokio::spawn(async move {
1208                    // Increment the number of outbound validators requests for this validator.
1209                    self_.cache.increment_outbound_validators_requests(validator_ip);
1210                    // Send a `ValidatorsRequest` to the validator.
1211                    let _ = Transport::send(&self_, validator_ip, Event::ValidatorsRequest(ValidatorsRequest)).await;
1212                });
1213            }
1214        }
1215    }
1216
1217    /// Processes a message received from the network.
1218    async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
1219        // Process the message. Disconnect if the peer violated the protocol.
1220        if let Err(error) = self.inbound(peer_addr, message).await
1221            && let Some(peer_ip) = self.resolver.read().get_listener(peer_addr)
1222        {
1223            warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
1224            let self_ = self.clone();
1225            tokio::spawn(async move {
1226                Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
1227                // Disconnect from this peer.
1228                self_.disconnect(peer_ip);
1229            });
1230        }
1231    }
1232
1233    // Remove addresses whose ban time has expired.
1234    fn handle_banned_ips(&self) {
1235        self.tcp.banned_peers().remove_old_bans(IP_BAN_TIME_IN_SECS);
1236    }
1237
1238    // Update the dynamic validator whitelist.
1239    fn update_validator_whitelist(&self) {
1240        if let Err(err) =
1241            self.save_best_peers(&self.node_data_dir.validator_whitelist_path(), Some(MAX_VALIDATORS_TO_SEND), false)
1242        {
1243            warn!("{CONTEXT} Could not update the validator whitelist: {err}");
1244        }
1245    }
1246}
1247
1248#[async_trait]
1249impl<N: Network> Transport<N> for Gateway<N> {
1250    /// Sends the given event to specified peer.
1251    ///
1252    /// This method is rate limited to prevent spamming the peer.
1253    ///
1254    /// This function returns as soon as the event is queued to be sent,
1255    /// without waiting for the actual delivery; instead, the caller is provided with a [`oneshot::Receiver`]
1256    /// which can be used to determine when and whether the event has been delivered.
1257    async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>> {
1258        macro_rules! send {
1259            ($self:ident, $cache_map:ident, $interval:expr, $freq:ident) => {{
1260                // Rate limit the number of certificate requests sent to the peer.
1261                while $self.cache.$cache_map(peer_ip, $interval) > $self.$freq() {
1262                    // Sleep for a short period of time to allow the cache to clear.
1263                    tokio::time::sleep(Duration::from_millis(10)).await;
1264                }
1265                // Send the event to the peer.
1266                $self.send_inner(peer_ip, event)
1267            }};
1268        }
1269
1270        // Increment the cache for certificate, transmission and block events.
1271        match event {
1272            Event::CertificateRequest(_) | Event::CertificateResponse(_) => {
1273                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1274                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1275                // Send the event to the peer.
1276                send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
1277            }
1278            Event::TransmissionRequest(_) | Event::TransmissionResponse(_) => {
1279                // Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
1280                self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
1281                // Send the event to the peer.
1282                send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
1283            }
1284            Event::BlockRequest(request) => {
1285                // Insert the outbound request so we can match it to responses.
1286                self.cache.insert_outbound_block_request(peer_ip, request);
1287                // Send the event to the peer and update the outbound event cache, use the general rate limit.
1288                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1289            }
1290            _ => {
1291                // Send the event to the peer, use the general rate limit.
1292                send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
1293            }
1294        }
1295    }
1296
1297    /// Broadcasts the given event to all connected peers.
1298    // TODO(ljedrz): the event should be checked for the presence of Data::Object, and
1299    // serialized in advance if it's there.
1300    fn broadcast(&self, event: Event<N>) {
1301        // Ensure there are connected peers.
1302        if self.number_of_connected_peers() > 0 {
1303            let self_ = self.clone();
1304            let connected_peers = self.connected_peers();
1305            tokio::spawn(async move {
1306                // Iterate through all connected peers.
1307                for peer_ip in connected_peers {
1308                    // Send the event to the peer.
1309                    let _ = Transport::send(&self_, peer_ip, event.clone()).await;
1310                }
1311            });
1312        }
1313    }
1314}
1315
1316impl<N: Network> P2P for Gateway<N> {
1317    /// Returns a reference to the TCP instance.
1318    fn tcp(&self) -> &Tcp {
1319        &self.tcp
1320    }
1321}
1322
1323#[async_trait]
1324impl<N: Network> Reading for Gateway<N> {
1325    type Codec = EventCodec<N>;
1326    type Message = Event<N>;
1327
1328    /// Creates a [`Decoder`] used to interpret messages from the network.
1329    /// The `side` param indicates the connection side **from the node's perspective**.
1330    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1331        Default::default()
1332    }
1333
1334    /// Processes a message received from the network.
1335    async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
1336        if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
1337            let self_ = self.clone();
1338            // Handle BlockRequest and BlockResponse messages in a separate task to not block the
1339            // inbound queue.
1340            tokio::spawn(async move {
1341                self_.process_message_inner(peer_addr, message).await;
1342            });
1343        } else {
1344            self.process_message_inner(peer_addr, message).await;
1345        }
1346        Ok(())
1347    }
1348
1349    /// Computes the depth of per-connection queues used to process inbound messages, sufficient to process the maximum expected load at any givent moment.
1350    /// The greater it is, the more inbound messages the node can enqueue, but a too large value can make the node more susceptible to DoS attacks.
1351    fn message_queue_depth(&self) -> usize {
1352        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1353            * N::LATEST_MAX_CERTIFICATES() as usize
1354            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1355    }
1356}
1357
1358#[async_trait]
1359impl<N: Network> Writing for Gateway<N> {
1360    type Codec = EventCodec<N>;
1361    type Message = Event<N>;
1362
1363    /// Creates an [`Encoder`] used to write the outbound messages to the target stream.
1364    /// The `side` parameter indicates the connection side **from the node's perspective**.
1365    fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec {
1366        Default::default()
1367    }
1368
1369    /// Computes the depth of per-connection queues used to send outbound messages, sufficient to process the maximum expected load at any givent moment.
1370    /// The greater it is, the more outbound messages the node can enqueue. A too large value large value might obscure potential issues with your implementation
1371    /// (like slow serialization) or network.
1372    fn message_queue_depth(&self) -> usize {
1373        2 * BatchHeader::<N>::MAX_GC_ROUNDS
1374            * N::LATEST_MAX_CERTIFICATES() as usize
1375            * BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH
1376    }
1377}
1378
1379#[async_trait]
1380impl<N: Network> Disconnect for Gateway<N> {
1381    /// Any extra operations to be performed during a disconnect.
1382    async fn handle_disconnect(&self, peer_addr: SocketAddr) {
1383        if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
1384            self.downgrade_peer_to_candidate(peer_ip);
1385            // Remove the peer from the sync module. Except for some tests, there is always a sync sender.
1386            if let Some(sync_sender) = self.sync_sender.get() {
1387                let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
1388                tokio::spawn(async move {
1389                    if let Err(err) = tx_block_sync_remove_peer_.send(peer_ip).await {
1390                        warn!("{CONTEXT} Unable to remove '{peer_ip}' from the sync module - {err}");
1391                    }
1392                });
1393            }
1394            // We don't clear this map based on time but only on peer disconnect.
1395            // This is sufficient to avoid infinite growth as the committee has a fixed number
1396            // of members.
1397            self.cache.clear_outbound_validators_requests(peer_ip);
1398            self.cache.clear_outbound_block_requests(peer_ip);
1399        }
1400    }
1401}
1402
1403#[async_trait]
1404impl<N: Network> OnConnect for Gateway<N> {
1405    async fn on_connect(&self, peer_addr: SocketAddr) {
1406        if let Some(listener_addr) = self.resolve_to_listener(&peer_addr) {
1407            if let Some(peer) = self.get_connected_peer(listener_addr) {
1408                if peer.node_type == NodeType::BootstrapClient {
1409                    self.cache.increment_outbound_validators_requests(listener_addr);
1410                    let _ =
1411                        <Self as Transport<N>>::send(self, listener_addr, Event::ValidatorsRequest(ValidatorsRequest))
1412                            .await;
1413                }
1414            }
1415        }
1416    }
1417}
1418
1419#[async_trait]
1420impl<N: Network> Handshake for Gateway<N> {
1421    /// Performs the handshake protocol.
1422    async fn perform_handshake(&self, mut connection: Connection) -> Result<Connection, ConnectError> {
1423        // Perform the handshake.
1424        let peer_addr = connection.addr();
1425        let peer_side = connection.side();
1426
1427        // Check (or impose) IP-level bans.
1428        #[cfg(not(test))]
1429        if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
1430            // If the IP is already banned reject the connection.
1431            if self.is_ip_banned(peer_addr.ip()) {
1432                trace!("{CONTEXT} Rejected a connection request from banned IP '{}'", peer_addr.ip());
1433                return Err(ConnectError::BannedIp { ip: peer_addr.ip() });
1434            }
1435
1436            let num_attempts = self.cache.insert_inbound_connection(peer_addr.ip(), CONNECTION_ATTEMPTS_SINCE_SECS);
1437
1438            debug!("Number of connection attempts from '{}': {}", peer_addr.ip(), num_attempts);
1439            if num_attempts > MAX_CONNECTION_ATTEMPTS {
1440                self.update_ip_ban(peer_addr.ip());
1441                trace!("{CONTEXT} Rejected a consecutive connection request from IP '{}'", peer_addr.ip());
1442                return Err(ConnectError::other(anyhow!("'{}' appears to be spamming connections", peer_addr.ip())));
1443            }
1444        }
1445
1446        let stream = self.borrow_stream(&mut connection);
1447
1448        // If this is an inbound connection, we log it, but don't know the listening address yet.
1449        // Otherwise, we can immediately register the listening address.
1450        let mut listener_addr = if peer_side == ConnectionSide::Initiator {
1451            debug!("{CONTEXT} Received a connection request from '{peer_addr}'");
1452            None
1453        } else {
1454            debug!("{CONTEXT} Shaking hands with {peer_addr}...");
1455            Some(peer_addr)
1456        };
1457
1458        // Retrieve the restrictions ID.
1459        let restrictions_id = self.ledger.latest_restrictions_id();
1460
1461        // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
1462        let handshake_result = if peer_side == ConnectionSide::Responder {
1463            self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await
1464        } else {
1465            self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await
1466        };
1467
1468        if let Some(addr) = listener_addr {
1469            match handshake_result {
1470                Ok(ref cr) => {
1471                    let node_type = if bootstrap_peers::<N>(self.is_dev()).contains(&addr) {
1472                        NodeType::BootstrapClient
1473                    } else {
1474                        NodeType::Validator
1475                    };
1476
1477                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1478                        self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address));
1479                        peer.upgrade_to_connected(
1480                            peer_addr,
1481                            cr.listener_port,
1482                            cr.address,
1483                            node_type,
1484                            cr.version,
1485                            cr.snarkos_sha,
1486                            ConnectionMode::Gateway,
1487                        );
1488                    }
1489                    info!("{CONTEXT} Connected to '{addr}'");
1490                }
1491                Err(error) => {
1492                    if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
1493                        // The peer may only be downgraded if it's a ConnectingPeer.
1494                        if peer.is_connecting() {
1495                            peer.downgrade_to_candidate(addr);
1496                        }
1497                    }
1498                    return Err(error);
1499                }
1500            }
1501        }
1502
1503        Ok(connection)
1504    }
1505}
1506
1507/// A macro unwrapping the expected handshake event or returning an error for unexpected events.
1508macro_rules! expect_event {
1509    ($event_ty:path, $framed:expr, $peer_addr:expr) => {
1510        match $framed.try_next().await? {
1511            // Received the expected event, proceed.
1512            Some($event_ty(data)) => {
1513                trace!("{CONTEXT} Received '{}' from '{}'", data.name(), $peer_addr);
1514                data
1515            }
1516            // Received a disconnect event, abort.
1517            Some(Event::Disconnect($crate::events::Disconnect { reason })) => {
1518                return Err(ConnectError::other(format!("'{}' disconnected with reason \"{reason}\"", $peer_addr)));
1519            }
1520            // Received an unexpected event, abort.
1521            Some(ty) => {
1522                return Err(ConnectError::other(format!(
1523                    "'{}' did not follow the handshake protocol: received {:?} instead of {}",
1524                    $peer_addr,
1525                    ty.name(),
1526                    stringify!($msg_ty),
1527                )));
1528            }
1529            // Received nothing.
1530            None => return Err(ConnectError::IoError(io::ErrorKind::BrokenPipe.into())),
1531        }
1532    };
1533}
1534
1535/// Send the given message to the peer.
1536async fn send_event<N: Network>(
1537    framed: &mut Framed<&mut TcpStream, EventCodec<N>>,
1538    peer_addr: SocketAddr,
1539    event: Event<N>,
1540) -> io::Result<()> {
1541    trace!("{CONTEXT} Sending '{}' to '{peer_addr}'", event.name());
1542    framed.send(event).await
1543}
1544
1545impl<N: Network> Gateway<N> {
1546    /// The connection initiator side of the handshake.
1547    async fn handshake_inner_initiator<'a>(
1548        &'a self,
1549        peer_addr: SocketAddr,
1550        restrictions_id: Field<N>,
1551        stream: &'a mut TcpStream,
1552    ) -> Result<ChallengeRequest<N>, ConnectError> {
1553        // Introduce the peer into the peer pool.
1554        self.add_connecting_peer(peer_addr)?;
1555
1556        // Construct the stream.
1557        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1558
1559        // Initialize an RNG.
1560        let rng = &mut rand::rngs::OsRng;
1561
1562        /* Step 1: Send the challenge request. */
1563
1564        // Sample a random nonce.
1565        let our_nonce = rng.r#gen();
1566        // Determine the snarkOS SHA to send to the peer.
1567        let current_block_height = self.ledger.latest_block_height();
1568        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1569        let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1570            (true, _, Some(sha)) => Some(sha),
1571            (_, true, Some(sha)) => Some(sha),
1572            _ => None,
1573        };
1574        // Send a challenge request to the peer.
1575        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1576        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1577
1578        /* Step 2: Receive the peer's challenge response followed by the challenge request. */
1579
1580        // Listen for the challenge response message.
1581        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1582        // Listen for the challenge request message.
1583        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1584
1585        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1586        if let Some(reason) = self
1587            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1588            .await
1589        {
1590            send_event(&mut framed, peer_addr, reason.into()).await?;
1591            return Err(ConnectError::application(reason));
1592        }
1593
1594        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1595        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1596            send_event(&mut framed, peer_addr, reason.into()).await?;
1597            return Err(reason.into_connect_error(peer_addr));
1598        }
1599
1600        /* Step 3: Send the challenge response. */
1601
1602        // Sign the counterparty nonce.
1603        let response_nonce: u64 = rng.r#gen();
1604        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1605        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1606            return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1607        };
1608        // Send the challenge response.
1609        let our_response =
1610            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1611        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1612
1613        Ok(peer_request)
1614    }
1615
1616    /// The connection responder side of the handshake.
1617    async fn handshake_inner_responder<'a>(
1618        &'a self,
1619        peer_addr: SocketAddr,
1620        peer_ip: &mut Option<SocketAddr>,
1621        restrictions_id: Field<N>,
1622        stream: &'a mut TcpStream,
1623    ) -> Result<ChallengeRequest<N>, ConnectError> {
1624        // Construct the stream.
1625        let mut framed = Framed::new(stream, EventCodec::<N>::handshake());
1626
1627        /* Step 1: Receive the challenge request. */
1628
1629        // Listen for the challenge request message.
1630        let peer_request = expect_event!(Event::ChallengeRequest, framed, peer_addr);
1631
1632        // Ensure the address is not the same as this node.
1633        if self.account.address() == peer_request.address {
1634            return Err(ConnectError::SelfConnect { address: peer_addr });
1635        }
1636
1637        // Obtain the peer's listening address.
1638        *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
1639        let peer_ip = peer_ip.unwrap();
1640
1641        // Knowing the peer's listening address, ensure it is allowed to connect.
1642        if let Err(reason) = self.ensure_peer_is_allowed(peer_ip) {
1643            send_event(&mut framed, peer_addr, reason.into()).await?;
1644            return Err(reason.into_connect_error(peer_addr));
1645        }
1646
1647        // Introduce the peer into the peer pool.
1648        self.add_connecting_peer(peer_ip)?;
1649
1650        // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
1651        if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
1652            send_event(&mut framed, peer_addr, reason.into()).await?;
1653            return Err(reason.into_connect_error(peer_addr));
1654        }
1655
1656        /* Step 2: Send the challenge response followed by own challenge request. */
1657
1658        // Initialize an RNG.
1659        let rng = &mut rand::rngs::OsRng;
1660
1661        // Sign the counterparty nonce.
1662        let response_nonce: u64 = rng.r#gen();
1663        let data = [peer_request.nonce.to_le_bytes(), response_nonce.to_le_bytes()].concat();
1664        let Ok(our_signature) = self.account.sign_bytes(&data, rng) else {
1665            return Err(ConnectError::other(anyhow!("Failed to sign the challenge request nonce")));
1666        };
1667        // Send the challenge response.
1668        let our_response =
1669            ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce };
1670        send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?;
1671
1672        // Sample a random nonce.
1673        let our_nonce = rng.r#gen();
1674        // Determine the snarkOS SHA to send to the peer.
1675        let current_block_height = self.ledger.latest_block_height();
1676        let consensus_version = N::CONSENSUS_VERSION(current_block_height).unwrap();
1677        let snarkos_sha = match (self.is_dev(), consensus_version >= ConsensusVersion::V12, get_repo_commit_hash()) {
1678            (true, _, Some(sha)) => Some(sha),
1679            (_, true, Some(sha)) => Some(sha),
1680            _ => None,
1681        };
1682        // Send the challenge request.
1683        let our_request = ChallengeRequest::new(self.local_ip().port(), self.account.address(), our_nonce, snarkos_sha);
1684        send_event(&mut framed, peer_addr, Event::ChallengeRequest(our_request)).await?;
1685
1686        /* Step 3: Receive the challenge response. */
1687
1688        // Listen for the challenge response message.
1689        let peer_response = expect_event!(Event::ChallengeResponse, framed, peer_addr);
1690        // Verify the challenge response. If a disconnect reason was returned, send the disconnect message and abort.
1691        if let Some(reason) = self
1692            .verify_challenge_response(peer_addr, peer_request.address, peer_response, restrictions_id, our_nonce)
1693            .await
1694        {
1695            send_event(&mut framed, peer_addr, reason.into()).await?;
1696            Err(reason.into_connect_error(peer_addr))
1697        } else {
1698            Ok(peer_request)
1699        }
1700    }
1701
1702    /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid.
1703    #[must_use]
1704    fn verify_challenge_request(&self, peer_addr: SocketAddr, event: &ChallengeRequest<N>) -> Option<DisconnectReason> {
1705        // Retrieve the components of the challenge request.
1706        let &ChallengeRequest { version, listener_port, address, nonce: _, ref snarkos_sha } = event;
1707        log_repo_sha_comparison(peer_addr, snarkos_sha, CONTEXT);
1708
1709        let listener_addr = SocketAddr::new(peer_addr.ip(), listener_port);
1710
1711        // Ensure the event protocol version is not outdated.
1712        if version < Event::<N>::VERSION {
1713            return Some(DisconnectReason::OutdatedClientVersion);
1714        }
1715        // If the node is in trusted peers only mode, ensure the peer is trusted.
1716        if self.trusted_peers_only && !self.is_trusted(listener_addr) {
1717            warn!("{CONTEXT} Dropping '{peer_addr}' for being an untrusted validator ({address})");
1718            return Some(DisconnectReason::NoExternalPeersAllowed);
1719        }
1720        if !bootstrap_peers::<N>(self.dev().is_some()).contains(&listener_addr) {
1721            // Ensure the address is a current committee member.
1722            if !self.is_authorized_validator_address(address) {
1723                return Some(DisconnectReason::UnauthorizedValidator);
1724            }
1725        }
1726
1727        // Ensure the address is not already connected.
1728        if self.is_connected_address(address) {
1729            return Some(DisconnectReason::AlreadyConnectedToAleoAddress);
1730        }
1731
1732        None
1733    }
1734
1735    /// Verifies the given challenge response. Returns a disconnect reason if the response is invalid.
1736    #[must_use]
1737    async fn verify_challenge_response(
1738        &self,
1739        peer_addr: SocketAddr,
1740        peer_address: Address<N>,
1741        response: ChallengeResponse<N>,
1742        expected_restrictions_id: Field<N>,
1743        expected_nonce: u64,
1744    ) -> Option<DisconnectReason> {
1745        // Retrieve the components of the challenge response.
1746        let ChallengeResponse { restrictions_id, signature, nonce } = response;
1747
1748        // Verify the restrictions ID.
1749        if restrictions_id != expected_restrictions_id {
1750            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (incorrect restrictions ID)");
1751            return Some(DisconnectReason::InvalidChallengeResponse);
1752        }
1753        // Perform the deferred non-blocking deserialization of the signature.
1754        let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1755            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (cannot deserialize the signature)");
1756            return Some(DisconnectReason::InvalidChallengeResponse);
1757        };
1758        // Verify the signature.
1759        if !signature.verify_bytes(&peer_address, &[expected_nonce.to_le_bytes(), nonce.to_le_bytes()].concat()) {
1760            warn!("{CONTEXT} Handshake with '{peer_addr}' failed (invalid signature)");
1761            return Some(DisconnectReason::InvalidChallengeResponse);
1762        }
1763        None
1764    }
1765}
1766
1767#[cfg(test)]
1768mod prop_tests {
1769    use crate::{
1770        Gateway,
1771        MAX_WORKERS,
1772        MEMORY_POOL_PORT,
1773        Worker,
1774        helpers::{Storage, init_primary_channels, init_worker_channels},
1775    };
1776
1777    use snarkos_account::Account;
1778    use snarkos_node_bft_ledger_service::MockLedgerService;
1779    use snarkos_node_bft_storage_service::BFTMemoryService;
1780    use snarkos_node_network::PeerPoolHandling;
1781    use snarkos_node_tcp::P2P;
1782    use snarkos_utilities::NodeDataDir;
1783
1784    use snarkvm::{
1785        ledger::{
1786            committee::{
1787                Committee,
1788                prop_tests::{CommitteeContext, ValidatorSet},
1789                test_helpers::sample_committee_for_round_and_members,
1790            },
1791            narwhal::{BatchHeader, batch_certificate::test_helpers::sample_batch_certificate_for_round},
1792        },
1793        prelude::{MainnetV0, PrivateKey},
1794        utilities::TestRng,
1795    };
1796
1797    use indexmap::{IndexMap, IndexSet};
1798    use proptest::{
1799        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any, any_with},
1800        sample::Selector,
1801    };
1802    use std::{
1803        fmt::{Debug, Formatter},
1804        net::{IpAddr, Ipv4Addr, SocketAddr},
1805        sync::Arc,
1806    };
1807    use test_strategy::proptest;
1808
1809    type CurrentNetwork = MainnetV0;
1810
1811    impl Debug for Gateway<CurrentNetwork> {
1812        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1813            // TODO implement Debug properly and move it over to production code
1814            f.debug_tuple("Gateway").field(&self.account.address()).field(&self.tcp.config()).finish()
1815        }
1816    }
1817
1818    #[derive(Debug, test_strategy::Arbitrary)]
1819    enum GatewayAddress {
1820        Dev(u8),
1821        Prod(Option<SocketAddr>),
1822    }
1823
1824    impl GatewayAddress {
1825        fn ip(&self) -> Option<SocketAddr> {
1826            if let GatewayAddress::Prod(ip) = self {
1827                return *ip;
1828            }
1829            None
1830        }
1831
1832        fn port(&self) -> Option<u16> {
1833            if let GatewayAddress::Dev(port) = self {
1834                return Some(*port as u16);
1835            }
1836            None
1837        }
1838    }
1839
1840    impl Arbitrary for Gateway<CurrentNetwork> {
1841        type Parameters = ();
1842        type Strategy = BoxedStrategy<Gateway<CurrentNetwork>>;
1843
1844        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1845            any_valid_dev_gateway()
1846                .prop_map(|(storage, _, private_key, address)| {
1847                    Gateway::new(
1848                        Account::try_from(private_key).unwrap(),
1849                        storage.clone(),
1850                        storage.ledger().clone(),
1851                        address.ip(),
1852                        &[],
1853                        false,
1854                        NodeDataDir::new_test(None),
1855                        address.port(),
1856                    )
1857                    .unwrap()
1858                })
1859                .boxed()
1860        }
1861    }
1862
1863    type GatewayInput = (Storage<CurrentNetwork>, CommitteeContext, PrivateKey<CurrentNetwork>, GatewayAddress);
1864
1865    fn any_valid_dev_gateway() -> BoxedStrategy<GatewayInput> {
1866        (any::<CommitteeContext>(), any::<Selector>())
1867            .prop_flat_map(|(context, account_selector)| {
1868                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1869                (
1870                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1871                    Just(context),
1872                    Just(account_selector.select(validators)),
1873                    0u8..,
1874                )
1875                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Dev(d)))
1876            })
1877            .boxed()
1878    }
1879
1880    fn any_valid_prod_gateway() -> BoxedStrategy<GatewayInput> {
1881        (any::<CommitteeContext>(), any::<Selector>())
1882            .prop_flat_map(|(context, account_selector)| {
1883                let CommitteeContext(_, ValidatorSet(validators)) = context.clone();
1884                (
1885                    any_with::<Storage<CurrentNetwork>>(context.clone()),
1886                    Just(context),
1887                    Just(account_selector.select(validators)),
1888                    any::<Option<SocketAddr>>(),
1889                )
1890                    .prop_map(|(a, b, c, d)| (a, b, c.private_key, GatewayAddress::Prod(d)))
1891            })
1892            .boxed()
1893    }
1894
1895    #[proptest]
1896    fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1897        let (storage, _, private_key, dev) = input;
1898        let account = Account::try_from(private_key).unwrap();
1899
1900        let gateway = Gateway::new(
1901            account.clone(),
1902            storage.clone(),
1903            storage.ledger().clone(),
1904            dev.ip(),
1905            &[],
1906            false,
1907            NodeDataDir::new_test(None),
1908            dev.port(),
1909        )
1910        .unwrap();
1911        let tcp_config = gateway.tcp().config();
1912        assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
1913        assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
1914
1915        let tcp_config = gateway.tcp().config();
1916        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1917        assert_eq!(gateway.account().address(), account.address());
1918    }
1919
1920    #[proptest]
1921    fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1922        let (storage, _, private_key, dev) = input;
1923        let account = Account::try_from(private_key).unwrap();
1924
1925        let gateway = Gateway::new(
1926            account.clone(),
1927            storage.clone(),
1928            storage.ledger().clone(),
1929            dev.ip(),
1930            &[],
1931            false,
1932            NodeDataDir::new_test(None),
1933            dev.port(),
1934        )
1935        .unwrap();
1936        let tcp_config = gateway.tcp().config();
1937        if let Some(socket_addr) = dev.ip() {
1938            assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
1939            assert_eq!(tcp_config.desired_listening_port, Some(socket_addr.port()));
1940        } else {
1941            assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)));
1942            assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT));
1943        }
1944
1945        let tcp_config = gateway.tcp().config();
1946        assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size() * 10);
1947        assert_eq!(gateway.account().address(), account.address());
1948    }
1949
1950    #[proptest(async = "tokio")]
1951    async fn gateway_start(
1952        #[strategy(any_valid_dev_gateway())] input: GatewayInput,
1953        #[strategy(0..MAX_WORKERS)] workers_count: u8,
1954    ) {
1955        let (storage, committee, private_key, dev) = input;
1956        let committee = committee.0;
1957        let worker_storage = storage.clone();
1958        let account = Account::try_from(private_key).unwrap();
1959
1960        let gateway = Gateway::new(
1961            account,
1962            storage.clone(),
1963            storage.ledger().clone(),
1964            dev.ip(),
1965            &[],
1966            false,
1967            NodeDataDir::new_test(None),
1968            dev.port(),
1969        )
1970        .unwrap();
1971
1972        let (primary_sender, _) = init_primary_channels();
1973
1974        let (workers, worker_senders) = {
1975            // Construct a map of the worker senders.
1976            let mut tx_workers = IndexMap::new();
1977            let mut workers = IndexMap::new();
1978
1979            // Initialize the workers.
1980            for id in 0..workers_count {
1981                // Construct the worker channels.
1982                let (tx_worker, rx_worker) = init_worker_channels();
1983                // Construct the worker instance.
1984                let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1985                let worker =
1986                    Worker::new(id, Arc::new(gateway.clone()), worker_storage.clone(), ledger, Default::default())
1987                        .unwrap();
1988                // Run the worker instance.
1989                worker.run(rx_worker);
1990
1991                // Add the worker and the worker sender to maps
1992                workers.insert(id, worker);
1993                tx_workers.insert(id, tx_worker);
1994            }
1995            (workers, tx_workers)
1996        };
1997
1998        gateway.run(primary_sender, worker_senders, None).await;
1999        assert_eq!(
2000            gateway.local_ip(),
2001            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
2002        );
2003        assert_eq!(gateway.num_workers(), workers.len() as u8);
2004    }
2005
2006    #[proptest]
2007    fn test_is_authorized_validator(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
2008        let rng = &mut TestRng::default();
2009
2010        // Initialize the round parameters.
2011        let current_round = 2;
2012        let committee_size = 4;
2013        let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
2014        let (_, _, private_key, dev) = input;
2015        let account = Account::try_from(private_key).unwrap();
2016
2017        // Sample the certificates.
2018        let mut certificates = IndexSet::new();
2019        for _ in 0..committee_size {
2020            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2021        }
2022        let addresses: Vec<_> = certificates.iter().map(|certificate| certificate.author()).collect();
2023        // Initialize the committee.
2024        let committee = sample_committee_for_round_and_members(current_round, addresses, rng);
2025        // Sample extra certificates from non-committee members.
2026        for _ in 0..committee_size {
2027            certificates.insert(sample_batch_certificate_for_round(current_round, rng));
2028        }
2029        // Initialize the ledger.
2030        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2031        // Initialize the storage.
2032        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2033        // Initialize the gateway.
2034        let gateway = Gateway::new(
2035            account.clone(),
2036            storage.clone(),
2037            ledger.clone(),
2038            dev.ip(),
2039            &[],
2040            false,
2041            NodeDataDir::new_test(None),
2042            dev.port(),
2043        )
2044        .unwrap();
2045        // Insert certificate to the storage.
2046        for certificate in certificates.iter() {
2047            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2048        }
2049        // Check that the current committee members are authorized validators.
2050        for i in 0..certificates.clone().len() {
2051            let is_authorized = gateway.is_authorized_validator_address(certificates[i].author());
2052            if i < committee_size {
2053                assert!(is_authorized);
2054            } else {
2055                assert!(!is_authorized);
2056            }
2057        }
2058    }
2059}